Skip to content
This repository has been archived by the owner on Apr 6, 2021. It is now read-only.

Fixsmithio #32

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "smith.io",
"version": "0.0.38",
"version": "0.0.39",
"author": "ajax.org B.V. <[email protected]>",
"contributors": [
{
Expand Down
59 changes: 47 additions & 12 deletions server-plugin/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const EVENTS = require("events");

// Switch from `away` to `disconnect` after this many milliseconds.
const RECONNECT_TIMEOUT = 60 * 1000;

const THRESHOLD = 300; // Time to detect the connection is gone

var engines = [];

Expand Down Expand Up @@ -49,6 +49,9 @@ module.exports = function startup(options, imports, register) {

var gee = new EVENTS.EventEmitter();

if (options.debug)
SMITH.debug = true;

if (options.messageRoute) {

var serverId = "server-id-" + Date.now();
Expand Down Expand Up @@ -76,7 +79,6 @@ module.exports = function startup(options, imports, register) {
}

engine.on("connection", function (socket) {

if (match && !match(socket.transport.request.url)) {
return;
}
Expand All @@ -92,35 +94,57 @@ module.exports = function startup(options, imports, register) {
delete timeouts[id];
}
if (!connections[id]) {
buffers[id] = [];
buffers["_" + id] = [];
connections[id] = {
ee: new EVENTS.EventEmitter(),
transport: transport
transport: transport,
sequence: 0
};

gee.emit("connect", {
id: id,
transport: connections[id].transport,
on: connections[id].ee.on.bind(connections[id].ee),
once: connections[id].ee.once.bind(connections[id].ee),
send: function(message) {
if (timeouts[id]) {
if (!buffers[id]) {
buffers[id] = [];
}
// Sequence number used to catch duplicates
if (message.push) {
message.push(++connections[id].sequence);
if (connections[id].sequence > 30000)
connections[id].sequence = 0;
}

var transport = connections[id].transport;
if (timeouts[id] || transport.socket.readyState != "open") {
buffers[id].push(message);
} else if (connections[id]) {
connections[id].transport.send(message);
}
else if (connections[id]) {
// Clear Existing Buffer > THRESHOLD
var now = Date.now();
var items = buffers["_" + id];
for (var i = items.length - 1; i >= 0; i--) {
if (now - items[i][1] > THRESHOLD)
items.splice(i, 1);
}

// Add item to buffer
items.push([message, Date.now()]);

// Send message
transport.send(message);
}
}
});
} else {
connections[id].transport = transport;
if (buffers[id]) {
buffers[id].forEach(function(message) {
connections[id].transport.send(message);
transport.send(message);
});
delete buffers[id];
buffers[id] = [];
}
connections[id].ee.emit("back");
connections[id].ee.emit("back", {transport: transport});
}
} else if (connections[id]) {
connections[id].ee.emit("message", message);
Expand All @@ -138,8 +162,19 @@ module.exports = function startup(options, imports, register) {
connections[id].ee.emit("disconnect", reason);
delete connections[id];
}
delete buffers[id];
delete buffers["_" + id];
id = false;
}, RECONNECT_TIMEOUT);

var now = Date.now();
buffers["_" + id].forEach(function(iter){
if (now - iter[1] < THRESHOLD) {
buffers[id].push(iter[0]);
}
});
buffers["_" + id] = [];

connections[id].ee.emit("away");
});

Expand Down
131 changes: 90 additions & 41 deletions server-plugin/www/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ define(function(require, exports, module) {
var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`.
var SMITH = require("smith");
var EVENTS = require("smith/events-amd");

var transports = [];
var debugHandler = null;
var connectCounter = 0;
var THRESHOLD = 300;

function inherits(Child, Parent) {
Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }});
Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }});
}

function getLogTimestamp() {
Expand All @@ -35,6 +36,9 @@ define(function(require, exports, module) {
this.away = false;
this.buffer = false;
this.connectIndex = -1;
this.sequence = 0;
this.buffer = [];
this.sbuffer = [];
}

inherits(Transport, EVENTS.EventEmitter);
Expand Down Expand Up @@ -86,8 +90,8 @@ define(function(require, exports, module) {

if (options.reconnectAttempt === 6) {
_self.away = false;
_self.connected = false;
try {
_self.connected = false;
try {
_self.emit("disconnect", "away re-connect attempts exceeded");
} catch(err) {
console.error(err.stack);
Expand Down Expand Up @@ -142,7 +146,24 @@ define(function(require, exports, module) {
}, delay);
}

var writeBuffer = _self.socket && _self.socket.writeBuffer;
_self.socket = new ENGINE_IO.Socket(_self.options);
if (writeBuffer) {
var onback, ondisconnect;
_self.on("back", onback = function(){
writeBuffer.forEach(function(packet){
_self.socket.sendPacket(packet.type, packet.data);
});

_self.removeListener("back", onback);
_self.removeListener("disconnect", ondisconnect);
});
_self.on("disconnect", ondisconnect = function(){
writeBuffer = null;
_self.removeListener("back", onback);
_self.removeListener("disconnect", ondisconnect);
});
}

_self.socket.on("error", function (err) {
if (_self.debug) {
Expand Down Expand Up @@ -183,8 +204,8 @@ define(function(require, exports, module) {
return;
} else
if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) {
// If `pongPayload.serverId` does not match our cached `_self.serverId` we close
// the connection and re-connect as the server instance has changed and we may need to re-init.
// If `pongPayload.serverId` does not match our cached `_self.serverId` we close
// the connection and re-connect as the server instance has changed and we may need to re-init.
if (_self.debug) {
log("Detected server reboot on heartbeat. Close connection.");
}
Expand Down Expand Up @@ -235,11 +256,11 @@ define(function(require, exports, module) {
_self.transport = new SMITH.EngineIoTransport(_self.socket);

_self.transport.on("legacy", function (message) {
if (typeof message === "object" && message.type === "__ASSIGN-ID__") {
if (typeof message === "object" && message.type === "__ASSIGN-ID__") {

if (_self.serverId !== false && _self.serverId !== message.serverId) {
// If `message.serverId` does not match our cached `_self.serverId` we issue
// a connect as the server instance has changed and we may need to re-init.
if (_self.serverId !== false && _self.serverId !== message.serverId) {
// If `message.serverId` does not match our cached `_self.serverId` we issue
// a connect as the server instance has changed and we may need to re-init.
if (_self.debug) {
log("Detected server reboot on handshake. Issue re-connect.");
}
Expand All @@ -252,17 +273,17 @@ define(function(require, exports, module) {
console.error(err.stack);
}
}
}
_self.serverId = message.serverId;

if (_self.id === false) {
_self.id = message.id;
}
_self.transport.send({
type: "__ANNOUNCE-ID__",
id: _self.id
});
if (_self.away && (Date.now()-_self.away) > 30*1000) {
}
_self.serverId = message.serverId;

if (_self.id === false) {
_self.id = message.id;
}
_self.transport.send({
type: "__ANNOUNCE-ID__",
id: _self.id
});
if (_self.away && (Date.now()-_self.away) > 30*1000) {
if (_self.debug) {
log("Long away (hibernate) detected. Issue re-connect.");
}
Expand All @@ -275,37 +296,38 @@ define(function(require, exports, module) {
console.error(err.stack);
}
}
}
_self.away = false;
_self.connected = true;
if (options.fireConnect !== false) {
try {
}
_self.away = false;
_self.connected = true;
if (options.fireConnect !== false) {
try {
_self.emit("connect", _self);
} catch(err) {
console.error(err.stack);
}
}
else if (options.reconnectAttempt > 0) {
if (_self.buffer) {
_self.buffer.forEach(function(message) {
_self.transport.send(message);
});
_self.buffer = [];
}

try {
_self.emit("back");
} catch(err) {
console.error(err.stack);
}
}
}
options.reconnectAttempt = 0;
if (_self.buffer) {
_self.buffer.forEach(function(message) {
_self.transport.send(message);
});
_self.buffer = false;
}
} else {
try {
} else {
try {
_self.emit("message", message);
} catch(err) {
console.error(err.stack);
}
}
}
});

_self.transport.on("disconnect", ondisconnect);
Expand All @@ -325,6 +347,15 @@ define(function(require, exports, module) {

if (_self.connected) {
_self.away = Date.now();

var now = Date.now();
_self.sbuffer.forEach(function(iter){
if (now - iter[1] < THRESHOLD) {
_self.buffer.push(iter[0]);
}
});
_self.sbuffer = [];

try {
_self.emit("away");
} catch(err) {
Expand All @@ -348,13 +379,31 @@ define(function(require, exports, module) {
console.log(err.stack);
throw err;
}
else if(this.away) {
if (!this.buffer) {
this.buffer = [];
}

if (message.push) {
message.push(++this.sequence);
if (this.sequence > 30000)
this.sequence = 0;
}

if (this.away) {
this.buffer.push(message);
}
this.transport.send(message);
else {
// Clear Existing Buffer > THRESHOLD
var now = Date.now();
var items = this.sbuffer;
for (var i = items.length - 1; i >= 0; i--) {
if (now - items[i][1] > THRESHOLD)
items.splice(i, 1);
}

// Add item to buffer
items.push([message, Date.now()]);

// Send message
this.transport.send(message);
}
}

exports.connect = function(options, callback) {
Expand Down