From e123dcac47b31d3275d2bbbe95639abd138a5a24 Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Tue, 21 May 2013 18:14:27 +0200 Subject: [PATCH 1/7] * Fixes reconnect issues with smith in client situation. --- package.json | 2 +- server-plugin/plugin.js | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 7348b52..a25c6d3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "smith.io", - "version": "0.0.38", + "version": "0.0.39", "author": "ajax.org B.V. ", "contributors": [ { diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index 60799d9..e68aa76 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -76,7 +76,6 @@ module.exports = function startup(options, imports, register) { } engine.on("connection", function (socket) { - if (match && !match(socket.transport.request.url)) { return; } @@ -120,7 +119,7 @@ module.exports = function startup(options, imports, register) { }); delete buffers[id]; } - connections[id].ee.emit("back"); + connections[id].ee.emit("back", {transport: transport}); } } else if (connections[id]) { connections[id].ee.emit("message", message); From 2c1f0e54208d345397410a1aadd706f7b6a0a49d Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Tue, 2 Jul 2013 14:53:30 +0000 Subject: [PATCH 2/7] Updated how the debug setting works --- server-plugin/plugin.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index e68aa76..ffabb1a 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -49,6 +49,9 @@ module.exports = function startup(options, imports, register) { var gee = new EVENTS.EventEmitter(); + if (options.debug) + SMITH.debug = true; + if (options.messageRoute) { var serverId = "server-id-" + Date.now(); From 0100119264e40b4f9d4cbca501078264e55e7616 Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Wed, 3 Jul 2013 18:29:47 +0000 Subject: [PATCH 3/7] working on vfs improvements --- server-plugin/plugin.js | 54 ++- server-plugin/www/client.js | 940 ++++++++++++++++++------------------ 2 files changed, 523 insertions(+), 471 deletions(-) diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index ffabb1a..5203fbf 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -8,7 +8,7 @@ const EVENTS = require("events"); // Switch from `away` to `disconnect` after this many milliseconds. const RECONNECT_TIMEOUT = 60 * 1000; - +const THRESHOLD = 300; // Time to detect the connection is gone var engines = []; @@ -94,23 +94,46 @@ module.exports = function startup(options, imports, register) { delete timeouts[id]; } if (!connections[id]) { + buffers[id] = []; + buffers["_" + id] = []; connections[id] = { ee: new EVENTS.EventEmitter(), - transport: transport + transport: transport, + sequence: 0 }; + gee.emit("connect", { id: id, transport: connections[id].transport, on: connections[id].ee.on.bind(connections[id].ee), once: connections[id].ee.once.bind(connections[id].ee), send: function(message) { - if (timeouts[id]) { - if (!buffers[id]) { - buffers[id] = []; - } + // Sequence number used to catch duplicates + if (message.length) { + message.push(++connections[id].sequence); + if (connections[id].sequence > 30000) + connections[id].sequence = 0; + } + + var transport = connections[id].transport; + console.log("-------", transport.socket.readyState); + if (timeouts[id] || transport.socket.readyState != "open") { buffers[id].push(message); - } else if (connections[id]) { - connections[id].transport.send(message); + } + else if (connections[id]) { + // Clear Existing Buffer > THRESHOLD + var now = Date.now(); + var items = buffers["_" + id]; + for (var i = items.length - 1; i >= 0; i--) { + if (now - items[i][1] > THRESHOLD) + items.splice(i, 1); + } + + // Add item to buffer + items.push([message, Date.now()]); + + // Send message + transport.send(message); } } }); @@ -118,9 +141,9 @@ module.exports = function startup(options, imports, register) { connections[id].transport = transport; if (buffers[id]) { buffers[id].forEach(function(message) { - connections[id].transport.send(message); + transport.send(message); }); - delete buffers[id]; + buffers[id] = []; } connections[id].ee.emit("back", {transport: transport}); } @@ -140,8 +163,19 @@ module.exports = function startup(options, imports, register) { connections[id].ee.emit("disconnect", reason); delete connections[id]; } + delete buffers[id]; + delete buffers["_" + id]; id = false; }, RECONNECT_TIMEOUT); + + var now = Date.now(); + buffers["_" + id].forEach(function(iter){ + if (now - iter[1] < THRESHOLD) { + buffers[id].push(iter[0]); + } + }); + buffers["_" + id] = []; + connections[id].ee.emit("away"); }); diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index 8b4ddb7..171bb61 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -1,466 +1,484 @@ define(function(require, exports, module) { - require("engine.io"); - var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. - var SMITH = require("smith"); - var EVENTS = require("smith/events-amd"); - - var transports = []; - var debugHandler = null; - var connectCounter = 0; - - function inherits(Child, Parent) { - Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); - } - - function getLogTimestamp() { - var date = new Date(); - return "[" + date.toLocaleTimeString() + ":" + date.getMilliseconds() + "]"; - } - - var Transport = function(options) { - this.options = options; - this.options.host = this.options.host || document.location.hostname; - if (this.options.port === 443) { - this.options.secure = true; - } - this.options.port = this.options.port || document.location.port; - this.options.path = this.options.prefix; - delete this.options.prefix; - this.id = false; - this.serverId = false; - this.connecting = false; - this.connected = false; - this.away = false; - this.buffer = false; - this.connectIndex = -1; - } - - inherits(Transport, EVENTS.EventEmitter); - - Transport.prototype.getUri = function() { - return "http" + ((this.options.secure)?"s":"") + "://" + - this.options.host + - ((this.options.port)?":"+this.options.port:"") + - this.options.path + - this.options.resource; - } - - Transport.prototype.connect = function(options, callback) { - var _self = this; - connectCounter += 1; - _self.connectIndex = connectCounter; - - var failed = false; - - function log(message) { - console.log(getLogTimestamp() + "[smith.io:" + _self.connectIndex + ":" + _self.getUri() + "] " + message); - } - - try { - - if (!_self.away && _self.connected) { - throw new Error("smith.io '" + _self.getUri() + "' is already connected!"); - } - if (_self.connecting) { - throw new Error("smith.io '" + _self.getUri() + "' is already connecting!"); - } - - if (_self.debug) { - log("Try connect", options); - } - - _self.connecting = true; - - function reconnect() { - if (_self.debug) { - log("Trigger re-connect scheduler."); - } - - if (typeof options.reconnectAttempt === "undefined") { - options.reconnectAttempt = 0; - } - - options.reconnectAttempt += 1; - - if (options.reconnectAttempt === 6) { - _self.away = false; - _self.connected = false; - try { - _self.emit("disconnect", "away re-connect attempts exceeded"); - } catch(err) { - console.error(err.stack); - } - } - - var delay = 250; - if (options.reconnectAttempt > 10) { - delay = 15 * 1000; - } - else if (options.reconnectAttempt > 5) { - delay = 5 * 1000; - } - else if (options.reconnectAttempt > 3) { - delay = 1 * 1000; - } - - if (_self.debug) { - log("Schedule re-connect in: " + delay); - } - - setTimeout(function() { - - if (!_self.away && _self.connected) { - if (_self.debug) { - log("Don't re-connect. Already connected!"); - } - return; - } - if (_self.connecting) { - if (_self.debug) { - log("Don't re-connect. Already connecting!"); - } - return; - } - - try { - _self.emit("reconnect", options.reconnectAttempt); - } catch(err) { - console.error(err.stack); - } - - _self.connect({ - reconnectAttempt: options.reconnectAttempt, - fireConnect: (options.reconnectAttempt >= 6) ? true : false - }, function(err) { - if (err) { - reconnect(); - return; - } - }); - }, delay); - } - - _self.socket = new ENGINE_IO.Socket(_self.options); - - _self.socket.on("error", function (err) { - if (_self.debug) { - log("Connect error (failed: " + failed + "): " + err.stack); - } - // Only relay first connection error. - if (!failed) { - failed = true; - - _self.connecting = false; - try { - callback(err); - } catch(err) { - console.error(err.stack); - } - - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on error"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - } - }); - - _self.socket.on("pong", function (pongPayload) { - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } else - if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) { - // If `pongPayload.serverId` does not match our cached `_self.serverId` we close - // the connection and re-connect as the server instance has changed and we may need to re-init. - if (_self.debug) { - log("Detected server reboot on heartbeat. Close connection."); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - _self.emit("heartbeat"); - }); - - _self.socket.on("heartbeat", function () { - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - }); - - _self.socket.on("open", function () { - - _self.connecting = false; - - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on open"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - - if (_self.debug) { - log("Init new socket (" + _self.socket.id + ")"); - } - - _self.transport = new SMITH.EngineIoTransport(_self.socket); - - _self.transport.on("legacy", function (message) { - if (typeof message === "object" && message.type === "__ASSIGN-ID__") { - - if (_self.serverId !== false && _self.serverId !== message.serverId) { - // If `message.serverId` does not match our cached `_self.serverId` we issue - // a connect as the server instance has changed and we may need to re-init. - if (_self.debug) { - log("Detected server reboot on handshake. Issue re-connect."); - } - options.fireConnect = true; - if (_self.connected === true) { - _self.connected = false; - try { - _self.emit("disconnect", "server reboot"); - } catch(err) { - console.error(err.stack); - } - } - } - _self.serverId = message.serverId; - - if (_self.id === false) { - _self.id = message.id; - } - _self.transport.send({ - type: "__ANNOUNCE-ID__", - id: _self.id - }); - if (_self.away && (Date.now()-_self.away) > 30*1000) { - if (_self.debug) { - log("Long away (hibernate) detected. Issue re-connect."); - } - options.fireConnect = true; - if (_self.connected === true) { - _self.connected = false; - try { - _self.emit("disconnect", "long away (hibernate)"); - } catch(err) { - console.error(err.stack); - } - } - } - _self.away = false; - _self.connected = true; - if (options.fireConnect !== false) { - try { - _self.emit("connect", _self); - } catch(err) { - console.error(err.stack); - } - } - else if (options.reconnectAttempt > 0) { - try { - _self.emit("back"); - } catch(err) { - console.error(err.stack); - } - } - options.reconnectAttempt = 0; - if (_self.buffer) { - _self.buffer.forEach(function(message) { - _self.transport.send(message); - }); - _self.buffer = false; - } - } else { - try { - _self.emit("message", message); - } catch(err) { - console.error(err.stack); - } - } - }); - - _self.transport.on("disconnect", ondisconnect); - _self.transport.on("error", ondisconnect); - var once = false; - function ondisconnect(reason) { - // Ignore probe errors - if (/probe error/i.test(reason)) return; - - // Only one try to reconnect - if (once) return; - once = true; - - if (_self.debug) { - log("Disconnect socket: " + reason); - } - - if (_self.connected) { - _self.away = Date.now(); - try { - _self.emit("away"); - } catch(err) { - console.error(err.stack); - } - } - - reconnect(); - }; - callback(null, _self); - }); - - } catch(err) { - callback(err); - } - } - Transport.prototype.send = function(message) { - if (this.connected === false) { - var err = new Error("Cannot send smith.io message while disconnected! Sender should respect connect/disconnect states!"); - // We log error here in case sender does not catch. - console.log(err.stack); - throw err; - } - else if(this.away) { - if (!this.buffer) { - this.buffer = []; - } - this.buffer.push(message); - } - this.transport.send(message); - } - - exports.connect = function(options, callback) { - var transport = new Transport(options, callback); - transports.push(transport); - if (debugHandler) { - debugHandler.hookTransport(transport); - } - if (transport.debug) { - console.log(getLogTimestamp() + "[smith.io:" + transport.getUri() + "] New transport", options); - } - transport.connect({}, callback); - return transport; - } - - exports.setDebug = function(debug, events) { - if (debugHandler !== null) { - debugHandler.stop(); - if (window.localStorage) { - localStorage.smithioDebug = ""; - localStorage.debug = ""; - } - } - if (!debug) return; - events = events || []; - if (window.localStorage) { - localStorage.smithioDebug = JSON.stringify([debug, events]); - } - debugHandler = { - transports: [], - handlers: [], - start: function() { - transports.forEach(debugHandler.hookTransport); - }, - stop: function() { - transports.forEach(debugHandler.unhookTransport); - debugHandler = null; - }, - hookTransport: function(transport) { - var index = debugHandler.transports.indexOf(transport); - if (index !== -1) return; - - function log(message) { - console.log(getLogTimestamp() + "[smith.io:" + transport.connectIndex + ":" + transport.getUri() + "] " + message); - } - - log("Hook debugger"); - - var listeners = {}; - - transport.debug = true; - - transport.on("connect", listeners["connect"] = function() { - log("Connect"); - }); - transport.on("reconnect", listeners["reconnect"] = function(attempt) { - log("Reconnect: " + attempt); - }); - transport.on("disconnect", listeners["disconnect"] = function(reason) { - log("Disconnect: " + reason); - }); - transport.on("heartbeat", listeners["heartbeat"] = function(message) { - log("Heartbeat"); - }); - if (events.indexOf("message") !== -1) { - transport.on("message", listeners["message"] = function(message) { - log("Message", message); - }); - } - transport.on("away", listeners["away"] = function() { - log("Away"); - }); - transport.on("back", listeners["back"] = function() { - log("Back"); - }); - - if (events.indexOf("engine.io") !== -1) { - if (window.localStorage) { - localStorage.debug = "*"; - } - } - - debugHandler.transports.push(transport); - debugHandler.handlers.push({ - unhook: function() { - log("Unhook debugger"); - transport.debug = false; - for (var type in listeners) { - transport.removeListener(type, listeners[type]); - } - } - }); - }, - unhookTransport: function(transport) { - var index = debugHandler.transports.indexOf(transport); - if (index === -1) return; - debugHandler.transports.splice(index, 1); - debugHandler.handlers[index].unhook(); - debugHandler.handlers.splice(index, 1); - } - }; - debugHandler.start(); - } - - if (window.localStorage && localStorage.smithioDebug) { - exports.setDebug.apply(null, JSON.parse(localStorage.smithioDebug)); - } + require("engine.io"); + var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. + var SMITH = require("smith"); + var EVENTS = require("smith/events-amd"); + + var transports = []; + var debugHandler = null; + var connectCounter = 0; + + function inherits(Child, Parent) { + Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); + } + + function getLogTimestamp() { + var date = new Date(); + return "[" + date.toLocaleTimeString() + ":" + date.getMilliseconds() + "]"; + } + + var Transport = function(options) { + this.options = options; + this.options.host = this.options.host || document.location.hostname; + if (this.options.port === 443) { + this.options.secure = true; + } + this.options.port = this.options.port || document.location.port; + this.options.path = this.options.prefix; + delete this.options.prefix; + this.id = false; + this.serverId = false; + this.connecting = false; + this.connected = false; + this.away = false; + this.buffer = false; + this.connectIndex = -1; + } + + inherits(Transport, EVENTS.EventEmitter); + + Transport.prototype.getUri = function() { + return "http" + ((this.options.secure)?"s":"") + "://" + + this.options.host + + ((this.options.port)?":"+this.options.port:"") + + this.options.path + + this.options.resource; + } + + Transport.prototype.connect = function(options, callback) { + var _self = this; + connectCounter += 1; + _self.connectIndex = connectCounter; + + var failed = false; + + function log(message) { + console.log(getLogTimestamp() + "[smith.io:" + _self.connectIndex + ":" + _self.getUri() + "] " + message); + } + + try { + + if (!_self.away && _self.connected) { + throw new Error("smith.io '" + _self.getUri() + "' is already connected!"); + } + if (_self.connecting) { + throw new Error("smith.io '" + _self.getUri() + "' is already connecting!"); + } + + if (_self.debug) { + log("Try connect", options); + } + + _self.connecting = true; + + function reconnect() { + if (_self.debug) { + log("Trigger re-connect scheduler."); + } + + if (typeof options.reconnectAttempt === "undefined") { + options.reconnectAttempt = 0; + } + + options.reconnectAttempt += 1; + + if (options.reconnectAttempt === 6) { + _self.away = false; + _self.connected = false; + try { + _self.emit("disconnect", "away re-connect attempts exceeded"); + } catch(err) { + console.error(err.stack); + } + } + + var delay = 250; + if (options.reconnectAttempt > 10) { + delay = 15 * 1000; + } + else if (options.reconnectAttempt > 5) { + delay = 5 * 1000; + } + else if (options.reconnectAttempt > 3) { + delay = 1 * 1000; + } + + if (_self.debug) { + log("Schedule re-connect in: " + delay); + } + + setTimeout(function() { + + if (!_self.away && _self.connected) { + if (_self.debug) { + log("Don't re-connect. Already connected!"); + } + return; + } + if (_self.connecting) { + if (_self.debug) { + log("Don't re-connect. Already connecting!"); + } + return; + } + + try { + _self.emit("reconnect", options.reconnectAttempt); + } catch(err) { + console.error(err.stack); + } + + _self.connect({ + reconnectAttempt: options.reconnectAttempt, + fireConnect: (options.reconnectAttempt >= 6) ? true : false + }, function(err) { + if (err) { + reconnect(); + return; + } + }); + }, delay); + } + + var writeBuffer = _self.socket && _self.socket.writeBuffer; + _self.socket = new ENGINE_IO.Socket(_self.options); + if (writeBuffer) { + var onback, ondisconnect; + _self.on("back", onback = function(){ + writeBuffer.forEach(function(packet){ + _self.socket.sendPacket(packet.type, packet.data); + }); + + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + _self.on("disconnect", ondisconnect = function(){ + writeBuffer = null; + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + } + + _self.socket.on("error", function (err) { + if (_self.debug) { + log("Connect error (failed: " + failed + "): " + err.stack); + } + // Only relay first connection error. + if (!failed) { + failed = true; + + _self.connecting = false; + try { + callback(err); + } catch(err) { + console.error(err.stack); + } + + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on error"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + } + }); + + _self.socket.on("pong", function (pongPayload) { + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } else + if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) { + // If `pongPayload.serverId` does not match our cached `_self.serverId` we close + // the connection and re-connect as the server instance has changed and we may need to re-init. + if (_self.debug) { + log("Detected server reboot on heartbeat. Close connection."); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + _self.emit("heartbeat"); + }); + + _self.socket.on("heartbeat", function () { + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + }); + + _self.socket.on("open", function () { + + _self.connecting = false; + + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on open"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + + if (_self.debug) { + log("Init new socket (" + _self.socket.id + ")"); + } + + _self.transport = new SMITH.EngineIoTransport(_self.socket); + + _self.transport.on("legacy", function (message) { + if (typeof message === "object" && message.type === "__ASSIGN-ID__") { + + if (_self.serverId !== false && _self.serverId !== message.serverId) { + // If `message.serverId` does not match our cached `_self.serverId` we issue + // a connect as the server instance has changed and we may need to re-init. + if (_self.debug) { + log("Detected server reboot on handshake. Issue re-connect."); + } + options.fireConnect = true; + if (_self.connected === true) { + _self.connected = false; + try { + _self.emit("disconnect", "server reboot"); + } catch(err) { + console.error(err.stack); + } + } + } + _self.serverId = message.serverId; + + if (_self.id === false) { + _self.id = message.id; + } + _self.transport.send({ + type: "__ANNOUNCE-ID__", + id: _self.id + }); + if (_self.away && (Date.now()-_self.away) > 30*1000) { + if (_self.debug) { + log("Long away (hibernate) detected. Issue re-connect."); + } + options.fireConnect = true; + if (_self.connected === true) { + _self.connected = false; + try { + _self.emit("disconnect", "long away (hibernate)"); + } catch(err) { + console.error(err.stack); + } + } + } + _self.away = false; + _self.connected = true; + if (options.fireConnect !== false) { + try { + _self.emit("connect", _self); + } catch(err) { + console.error(err.stack); + } + } + else if (options.reconnectAttempt > 0) { + try { + _self.emit("back"); + } catch(err) { + console.error(err.stack); + } + } + options.reconnectAttempt = 0; + if (_self.buffer) { + _self.buffer.forEach(function(message) { + _self.transport.send(message); + }); + _self.buffer = false; + } + } else { + try { + _self.emit("message", message); + } catch(err) { + console.error(err.stack); + } + } + }); + + _self.transport.on("disconnect", ondisconnect); + _self.transport.on("error", ondisconnect); + var once = false; + function ondisconnect(reason) { + // Ignore probe errors + if (/probe error/i.test(reason)) return; + + // Only one try to reconnect + if (once) return; + once = true; + + if (_self.debug) { + log("Disconnect socket: " + reason); + } + + if (_self.connected) { + _self.away = Date.now(); + try { + _self.emit("away"); + } catch(err) { + console.error(err.stack); + } + } + + reconnect(); + }; + callback(null, _self); + }); + + } catch(err) { + callback(err); + } + } + Transport.prototype.send = function(message) { + if (this.connected === false) { + var err = new Error("Cannot send smith.io message while disconnected! Sender should respect connect/disconnect states!"); + // We log error here in case sender does not catch. + console.log(err.stack); + throw err; + } + // else if(this.away) { + // if (!this.buffer) { + // this.buffer = []; + // } + // this.buffer.push(message); + // } + message + this.transport.send(message); + } + + exports.connect = function(options, callback) { + var transport = new Transport(options, callback); + transports.push(transport); + if (debugHandler) { + debugHandler.hookTransport(transport); + } + if (transport.debug) { + console.log(getLogTimestamp() + "[smith.io:" + transport.getUri() + "] New transport", options); + } + transport.connect({}, callback); + return transport; + } + + exports.setDebug = function(debug, events) { + if (debugHandler !== null) { + debugHandler.stop(); + if (window.localStorage) { + localStorage.smithioDebug = ""; + localStorage.debug = ""; + } + } + if (!debug) return; + events = events || []; + if (window.localStorage) { + localStorage.smithioDebug = JSON.stringify([debug, events]); + } + debugHandler = { + transports: [], + handlers: [], + start: function() { + transports.forEach(debugHandler.hookTransport); + }, + stop: function() { + transports.forEach(debugHandler.unhookTransport); + debugHandler = null; + }, + hookTransport: function(transport) { + var index = debugHandler.transports.indexOf(transport); + if (index !== -1) return; + + function log(message) { + console.log(getLogTimestamp() + "[smith.io:" + transport.connectIndex + ":" + transport.getUri() + "] " + message); + } + + log("Hook debugger"); + + var listeners = {}; + + transport.debug = true; + + transport.on("connect", listeners["connect"] = function() { + log("Connect"); + }); + transport.on("reconnect", listeners["reconnect"] = function(attempt) { + log("Reconnect: " + attempt); + }); + transport.on("disconnect", listeners["disconnect"] = function(reason) { + log("Disconnect: " + reason); + }); + transport.on("heartbeat", listeners["heartbeat"] = function(message) { + log("Heartbeat"); + }); + if (events.indexOf("message") !== -1) { + transport.on("message", listeners["message"] = function(message) { + log("Message", message); + }); + } + transport.on("away", listeners["away"] = function() { + log("Away"); + }); + transport.on("back", listeners["back"] = function() { + log("Back"); + }); + + if (events.indexOf("engine.io") !== -1) { + if (window.localStorage) { + localStorage.debug = "*"; + } + } + + debugHandler.transports.push(transport); + debugHandler.handlers.push({ + unhook: function() { + log("Unhook debugger"); + transport.debug = false; + for (var type in listeners) { + transport.removeListener(type, listeners[type]); + } + } + }); + }, + unhookTransport: function(transport) { + var index = debugHandler.transports.indexOf(transport); + if (index === -1) return; + debugHandler.transports.splice(index, 1); + debugHandler.handlers[index].unhook(); + debugHandler.handlers.splice(index, 1); + } + }; + debugHandler.start(); + } + + if (window.localStorage && localStorage.smithioDebug) { + exports.setDebug.apply(null, JSON.parse(localStorage.smithioDebug)); + } }); From fa91b1caa03d37939269d64157207583e1aa7178 Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Wed, 3 Jul 2013 19:19:35 +0000 Subject: [PATCH 4/7] Server is consistent --- server-plugin/www/client.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index 171bb61..9dd0752 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -35,6 +35,7 @@ define(function(require, exports, module) { this.away = false; this.buffer = false; this.connectIndex = -1; + this.sequence = 0; } inherits(Transport, EVENTS.EventEmitter); @@ -371,7 +372,13 @@ define(function(require, exports, module) { // } // this.buffer.push(message); // } - message + + if (message.length) { + message.push(++this.sequence); + if (this.sequence > 30000) + this.sequence = 0; + } + this.transport.send(message); } From 90cac3a2b8f7324b8b2c5a7b5698fa094020cf8a Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Wed, 3 Jul 2013 19:31:03 +0000 Subject: [PATCH 5/7] Server and client now behave the same way --- server-plugin/plugin.js | 1 - server-plugin/www/client.js | 54 ++++++++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index 5203fbf..0c044b7 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -116,7 +116,6 @@ module.exports = function startup(options, imports, register) { } var transport = connections[id].transport; - console.log("-------", transport.socket.readyState); if (timeouts[id] || transport.socket.readyState != "open") { buffers[id].push(message); } diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index 9dd0752..eace9bb 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -5,10 +5,11 @@ define(function(require, exports, module) { var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. var SMITH = require("smith"); var EVENTS = require("smith/events-amd"); - + var transports = []; var debugHandler = null; var connectCounter = 0; + var THRESHOLD = 300; function inherits(Child, Parent) { Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); @@ -36,6 +37,8 @@ define(function(require, exports, module) { this.buffer = false; this.connectIndex = -1; this.sequence = 0; + this.buffer = []; + this.sbuffer = []; } inherits(Transport, EVENTS.EventEmitter); @@ -304,19 +307,20 @@ define(function(require, exports, module) { } } else if (options.reconnectAttempt > 0) { + if (_self.buffer) { + _self.buffer.forEach(function(message) { + _self.transport.send(message); + }); + _self.buffer = []; + } + try { _self.emit("back"); } catch(err) { console.error(err.stack); - } + } } options.reconnectAttempt = 0; - if (_self.buffer) { - _self.buffer.forEach(function(message) { - _self.transport.send(message); - }); - _self.buffer = false; - } } else { try { _self.emit("message", message); @@ -343,6 +347,15 @@ define(function(require, exports, module) { if (_self.connected) { _self.away = Date.now(); + + var now = Date.now(); + _self.sbuffer.forEach(function(iter){ + if (now - iter[1] < THRESHOLD) { + _self.buffer.push(iter[0]); + } + }); + _self.sbuffer = []; + try { _self.emit("away"); } catch(err) { @@ -366,12 +379,6 @@ define(function(require, exports, module) { console.log(err.stack); throw err; } - // else if(this.away) { - // if (!this.buffer) { - // this.buffer = []; - // } - // this.buffer.push(message); - // } if (message.length) { message.push(++this.sequence); @@ -379,7 +386,24 @@ define(function(require, exports, module) { this.sequence = 0; } - this.transport.send(message); + if (this.away) { + this.buffer.push(message); + } + else { + // Clear Existing Buffer > THRESHOLD + var now = Date.now(); + var items = this.sbuffer; + for (var i = items.length - 1; i >= 0; i--) { + if (now - items[i][1] > THRESHOLD) + items.splice(i, 1); + } + + // Add item to buffer + items.push([message, Date.now()]); + + // Send message + this.transport.send(message); + } } exports.connect = function(options, callback) { From c813df4d860b62e90ae3f6623ac7359779c4162d Mon Sep 17 00:00:00 2001 From: Ruben Daniels Date: Wed, 3 Jul 2013 19:56:01 +0000 Subject: [PATCH 6/7] converted to tabs --- server-plugin/www/client.js | 1020 +++++++++++++++++------------------ 1 file changed, 510 insertions(+), 510 deletions(-) diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index eace9bb..9103095 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -1,515 +1,515 @@ define(function(require, exports, module) { - require("engine.io"); - var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. - var SMITH = require("smith"); - var EVENTS = require("smith/events-amd"); - - var transports = []; - var debugHandler = null; - var connectCounter = 0; - var THRESHOLD = 300; - - function inherits(Child, Parent) { - Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); - } - - function getLogTimestamp() { - var date = new Date(); - return "[" + date.toLocaleTimeString() + ":" + date.getMilliseconds() + "]"; - } - - var Transport = function(options) { - this.options = options; - this.options.host = this.options.host || document.location.hostname; - if (this.options.port === 443) { - this.options.secure = true; - } - this.options.port = this.options.port || document.location.port; - this.options.path = this.options.prefix; - delete this.options.prefix; - this.id = false; - this.serverId = false; - this.connecting = false; - this.connected = false; - this.away = false; - this.buffer = false; - this.connectIndex = -1; - this.sequence = 0; - this.buffer = []; - this.sbuffer = []; - } - - inherits(Transport, EVENTS.EventEmitter); - - Transport.prototype.getUri = function() { - return "http" + ((this.options.secure)?"s":"") + "://" + - this.options.host + - ((this.options.port)?":"+this.options.port:"") + - this.options.path + - this.options.resource; - } - - Transport.prototype.connect = function(options, callback) { - var _self = this; - connectCounter += 1; - _self.connectIndex = connectCounter; - - var failed = false; - - function log(message) { - console.log(getLogTimestamp() + "[smith.io:" + _self.connectIndex + ":" + _self.getUri() + "] " + message); - } - - try { - - if (!_self.away && _self.connected) { - throw new Error("smith.io '" + _self.getUri() + "' is already connected!"); - } - if (_self.connecting) { - throw new Error("smith.io '" + _self.getUri() + "' is already connecting!"); - } - - if (_self.debug) { - log("Try connect", options); - } - - _self.connecting = true; - - function reconnect() { - if (_self.debug) { - log("Trigger re-connect scheduler."); - } - - if (typeof options.reconnectAttempt === "undefined") { - options.reconnectAttempt = 0; - } - - options.reconnectAttempt += 1; - - if (options.reconnectAttempt === 6) { - _self.away = false; - _self.connected = false; - try { - _self.emit("disconnect", "away re-connect attempts exceeded"); - } catch(err) { - console.error(err.stack); - } - } - - var delay = 250; - if (options.reconnectAttempt > 10) { - delay = 15 * 1000; - } - else if (options.reconnectAttempt > 5) { - delay = 5 * 1000; - } - else if (options.reconnectAttempt > 3) { - delay = 1 * 1000; - } - - if (_self.debug) { - log("Schedule re-connect in: " + delay); - } - - setTimeout(function() { - - if (!_self.away && _self.connected) { - if (_self.debug) { - log("Don't re-connect. Already connected!"); - } - return; - } - if (_self.connecting) { - if (_self.debug) { - log("Don't re-connect. Already connecting!"); - } - return; - } - - try { - _self.emit("reconnect", options.reconnectAttempt); - } catch(err) { - console.error(err.stack); - } - - _self.connect({ - reconnectAttempt: options.reconnectAttempt, - fireConnect: (options.reconnectAttempt >= 6) ? true : false - }, function(err) { - if (err) { - reconnect(); - return; - } - }); - }, delay); - } - - var writeBuffer = _self.socket && _self.socket.writeBuffer; - _self.socket = new ENGINE_IO.Socket(_self.options); - if (writeBuffer) { - var onback, ondisconnect; - _self.on("back", onback = function(){ - writeBuffer.forEach(function(packet){ - _self.socket.sendPacket(packet.type, packet.data); - }); - - _self.removeListener("back", onback); - _self.removeListener("disconnect", ondisconnect); - }); - _self.on("disconnect", ondisconnect = function(){ - writeBuffer = null; - _self.removeListener("back", onback); - _self.removeListener("disconnect", ondisconnect); - }); - } - - _self.socket.on("error", function (err) { - if (_self.debug) { - log("Connect error (failed: " + failed + "): " + err.stack); - } - // Only relay first connection error. - if (!failed) { - failed = true; - - _self.connecting = false; - try { - callback(err); - } catch(err) { - console.error(err.stack); - } - - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on error"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - } - }); - - _self.socket.on("pong", function (pongPayload) { - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } else - if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) { - // If `pongPayload.serverId` does not match our cached `_self.serverId` we close - // the connection and re-connect as the server instance has changed and we may need to re-init. - if (_self.debug) { - log("Detected server reboot on heartbeat. Close connection."); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - _self.emit("heartbeat"); - }); - - _self.socket.on("heartbeat", function () { - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - }); - - _self.socket.on("open", function () { - - _self.connecting = false; - - if (failed) { - if (_self.debug) { - log("Close failed socket (" + _self.socket.readyState + ") on open"); - } - if (_self.socket.readyState !== "closed") { - try { - _self.socket.close(); - } catch(err) {} - } - return; - } - - if (_self.debug) { - log("Init new socket (" + _self.socket.id + ")"); - } - - _self.transport = new SMITH.EngineIoTransport(_self.socket); - - _self.transport.on("legacy", function (message) { - if (typeof message === "object" && message.type === "__ASSIGN-ID__") { - - if (_self.serverId !== false && _self.serverId !== message.serverId) { - // If `message.serverId` does not match our cached `_self.serverId` we issue - // a connect as the server instance has changed and we may need to re-init. - if (_self.debug) { - log("Detected server reboot on handshake. Issue re-connect."); - } - options.fireConnect = true; - if (_self.connected === true) { - _self.connected = false; - try { - _self.emit("disconnect", "server reboot"); - } catch(err) { - console.error(err.stack); - } - } - } - _self.serverId = message.serverId; - - if (_self.id === false) { - _self.id = message.id; - } - _self.transport.send({ - type: "__ANNOUNCE-ID__", - id: _self.id - }); - if (_self.away && (Date.now()-_self.away) > 30*1000) { - if (_self.debug) { - log("Long away (hibernate) detected. Issue re-connect."); - } - options.fireConnect = true; - if (_self.connected === true) { - _self.connected = false; - try { - _self.emit("disconnect", "long away (hibernate)"); - } catch(err) { - console.error(err.stack); - } - } - } - _self.away = false; - _self.connected = true; - if (options.fireConnect !== false) { - try { - _self.emit("connect", _self); - } catch(err) { - console.error(err.stack); - } - } - else if (options.reconnectAttempt > 0) { - if (_self.buffer) { - _self.buffer.forEach(function(message) { - _self.transport.send(message); - }); - _self.buffer = []; - } - - try { - _self.emit("back"); - } catch(err) { - console.error(err.stack); - } - } - options.reconnectAttempt = 0; - } else { - try { - _self.emit("message", message); - } catch(err) { - console.error(err.stack); - } - } - }); - - _self.transport.on("disconnect", ondisconnect); - _self.transport.on("error", ondisconnect); - var once = false; - function ondisconnect(reason) { - // Ignore probe errors - if (/probe error/i.test(reason)) return; - - // Only one try to reconnect - if (once) return; - once = true; - - if (_self.debug) { - log("Disconnect socket: " + reason); - } - - if (_self.connected) { - _self.away = Date.now(); - - var now = Date.now(); - _self.sbuffer.forEach(function(iter){ - if (now - iter[1] < THRESHOLD) { - _self.buffer.push(iter[0]); - } - }); - _self.sbuffer = []; - - try { - _self.emit("away"); - } catch(err) { - console.error(err.stack); - } - } - - reconnect(); - }; - callback(null, _self); - }); - - } catch(err) { - callback(err); - } - } - Transport.prototype.send = function(message) { - if (this.connected === false) { - var err = new Error("Cannot send smith.io message while disconnected! Sender should respect connect/disconnect states!"); - // We log error here in case sender does not catch. - console.log(err.stack); - throw err; - } - - if (message.length) { - message.push(++this.sequence); - if (this.sequence > 30000) - this.sequence = 0; - } - - if (this.away) { - this.buffer.push(message); - } - else { - // Clear Existing Buffer > THRESHOLD - var now = Date.now(); - var items = this.sbuffer; - for (var i = items.length - 1; i >= 0; i--) { - if (now - items[i][1] > THRESHOLD) - items.splice(i, 1); - } - - // Add item to buffer - items.push([message, Date.now()]); - - // Send message - this.transport.send(message); - } - } - - exports.connect = function(options, callback) { - var transport = new Transport(options, callback); - transports.push(transport); - if (debugHandler) { - debugHandler.hookTransport(transport); - } - if (transport.debug) { - console.log(getLogTimestamp() + "[smith.io:" + transport.getUri() + "] New transport", options); - } - transport.connect({}, callback); - return transport; - } - - exports.setDebug = function(debug, events) { - if (debugHandler !== null) { - debugHandler.stop(); - if (window.localStorage) { - localStorage.smithioDebug = ""; - localStorage.debug = ""; - } - } - if (!debug) return; - events = events || []; - if (window.localStorage) { - localStorage.smithioDebug = JSON.stringify([debug, events]); - } - debugHandler = { - transports: [], - handlers: [], - start: function() { - transports.forEach(debugHandler.hookTransport); - }, - stop: function() { - transports.forEach(debugHandler.unhookTransport); - debugHandler = null; - }, - hookTransport: function(transport) { - var index = debugHandler.transports.indexOf(transport); - if (index !== -1) return; - - function log(message) { - console.log(getLogTimestamp() + "[smith.io:" + transport.connectIndex + ":" + transport.getUri() + "] " + message); - } - - log("Hook debugger"); - - var listeners = {}; - - transport.debug = true; - - transport.on("connect", listeners["connect"] = function() { - log("Connect"); - }); - transport.on("reconnect", listeners["reconnect"] = function(attempt) { - log("Reconnect: " + attempt); - }); - transport.on("disconnect", listeners["disconnect"] = function(reason) { - log("Disconnect: " + reason); - }); - transport.on("heartbeat", listeners["heartbeat"] = function(message) { - log("Heartbeat"); - }); - if (events.indexOf("message") !== -1) { - transport.on("message", listeners["message"] = function(message) { - log("Message", message); - }); - } - transport.on("away", listeners["away"] = function() { - log("Away"); - }); - transport.on("back", listeners["back"] = function() { - log("Back"); - }); - - if (events.indexOf("engine.io") !== -1) { - if (window.localStorage) { - localStorage.debug = "*"; - } - } - - debugHandler.transports.push(transport); - debugHandler.handlers.push({ - unhook: function() { - log("Unhook debugger"); - transport.debug = false; - for (var type in listeners) { - transport.removeListener(type, listeners[type]); - } - } - }); - }, - unhookTransport: function(transport) { - var index = debugHandler.transports.indexOf(transport); - if (index === -1) return; - debugHandler.transports.splice(index, 1); - debugHandler.handlers[index].unhook(); - debugHandler.handlers.splice(index, 1); - } - }; - debugHandler.start(); - } - - if (window.localStorage && localStorage.smithioDebug) { - exports.setDebug.apply(null, JSON.parse(localStorage.smithioDebug)); - } + require("engine.io"); + var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. + var SMITH = require("smith"); + var EVENTS = require("smith/events-amd"); + + var transports = []; + var debugHandler = null; + var connectCounter = 0; + var THRESHOLD = 300; + + function inherits(Child, Parent) { + Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); + } + + function getLogTimestamp() { + var date = new Date(); + return "[" + date.toLocaleTimeString() + ":" + date.getMilliseconds() + "]"; + } + + var Transport = function(options) { + this.options = options; + this.options.host = this.options.host || document.location.hostname; + if (this.options.port === 443) { + this.options.secure = true; + } + this.options.port = this.options.port || document.location.port; + this.options.path = this.options.prefix; + delete this.options.prefix; + this.id = false; + this.serverId = false; + this.connecting = false; + this.connected = false; + this.away = false; + this.buffer = false; + this.connectIndex = -1; + this.sequence = 0; + this.buffer = []; + this.sbuffer = []; + } + + inherits(Transport, EVENTS.EventEmitter); + + Transport.prototype.getUri = function() { + return "http" + ((this.options.secure)?"s":"") + "://" + + this.options.host + + ((this.options.port)?":"+this.options.port:"") + + this.options.path + + this.options.resource; + } + + Transport.prototype.connect = function(options, callback) { + var _self = this; + connectCounter += 1; + _self.connectIndex = connectCounter; + + var failed = false; + + function log(message) { + console.log(getLogTimestamp() + "[smith.io:" + _self.connectIndex + ":" + _self.getUri() + "] " + message); + } + + try { + + if (!_self.away && _self.connected) { + throw new Error("smith.io '" + _self.getUri() + "' is already connected!"); + } + if (_self.connecting) { + throw new Error("smith.io '" + _self.getUri() + "' is already connecting!"); + } + + if (_self.debug) { + log("Try connect", options); + } + + _self.connecting = true; + + function reconnect() { + if (_self.debug) { + log("Trigger re-connect scheduler."); + } + + if (typeof options.reconnectAttempt === "undefined") { + options.reconnectAttempt = 0; + } + + options.reconnectAttempt += 1; + + if (options.reconnectAttempt === 6) { + _self.away = false; + _self.connected = false; + try { + _self.emit("disconnect", "away re-connect attempts exceeded"); + } catch(err) { + console.error(err.stack); + } + } + + var delay = 250; + if (options.reconnectAttempt > 10) { + delay = 15 * 1000; + } + else if (options.reconnectAttempt > 5) { + delay = 5 * 1000; + } + else if (options.reconnectAttempt > 3) { + delay = 1 * 1000; + } + + if (_self.debug) { + log("Schedule re-connect in: " + delay); + } + + setTimeout(function() { + + if (!_self.away && _self.connected) { + if (_self.debug) { + log("Don't re-connect. Already connected!"); + } + return; + } + if (_self.connecting) { + if (_self.debug) { + log("Don't re-connect. Already connecting!"); + } + return; + } + + try { + _self.emit("reconnect", options.reconnectAttempt); + } catch(err) { + console.error(err.stack); + } + + _self.connect({ + reconnectAttempt: options.reconnectAttempt, + fireConnect: (options.reconnectAttempt >= 6) ? true : false + }, function(err) { + if (err) { + reconnect(); + return; + } + }); + }, delay); + } + + var writeBuffer = _self.socket && _self.socket.writeBuffer; + _self.socket = new ENGINE_IO.Socket(_self.options); + if (writeBuffer) { + var onback, ondisconnect; + _self.on("back", onback = function(){ + writeBuffer.forEach(function(packet){ + _self.socket.sendPacket(packet.type, packet.data); + }); + + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + _self.on("disconnect", ondisconnect = function(){ + writeBuffer = null; + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + } + + _self.socket.on("error", function (err) { + if (_self.debug) { + log("Connect error (failed: " + failed + "): " + err.stack); + } + // Only relay first connection error. + if (!failed) { + failed = true; + + _self.connecting = false; + try { + callback(err); + } catch(err) { + console.error(err.stack); + } + + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on error"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + } + }); + + _self.socket.on("pong", function (pongPayload) { + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } else + if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) { + // If `pongPayload.serverId` does not match our cached `_self.serverId` we close + // the connection and re-connect as the server instance has changed and we may need to re-init. + if (_self.debug) { + log("Detected server reboot on heartbeat. Close connection."); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + _self.emit("heartbeat"); + }); + + _self.socket.on("heartbeat", function () { + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on heartbeat"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + }); + + _self.socket.on("open", function () { + + _self.connecting = false; + + if (failed) { + if (_self.debug) { + log("Close failed socket (" + _self.socket.readyState + ") on open"); + } + if (_self.socket.readyState !== "closed") { + try { + _self.socket.close(); + } catch(err) {} + } + return; + } + + if (_self.debug) { + log("Init new socket (" + _self.socket.id + ")"); + } + + _self.transport = new SMITH.EngineIoTransport(_self.socket); + + _self.transport.on("legacy", function (message) { + if (typeof message === "object" && message.type === "__ASSIGN-ID__") { + + if (_self.serverId !== false && _self.serverId !== message.serverId) { + // If `message.serverId` does not match our cached `_self.serverId` we issue + // a connect as the server instance has changed and we may need to re-init. + if (_self.debug) { + log("Detected server reboot on handshake. Issue re-connect."); + } + options.fireConnect = true; + if (_self.connected === true) { + _self.connected = false; + try { + _self.emit("disconnect", "server reboot"); + } catch(err) { + console.error(err.stack); + } + } + } + _self.serverId = message.serverId; + + if (_self.id === false) { + _self.id = message.id; + } + _self.transport.send({ + type: "__ANNOUNCE-ID__", + id: _self.id + }); + if (_self.away && (Date.now()-_self.away) > 30*1000) { + if (_self.debug) { + log("Long away (hibernate) detected. Issue re-connect."); + } + options.fireConnect = true; + if (_self.connected === true) { + _self.connected = false; + try { + _self.emit("disconnect", "long away (hibernate)"); + } catch(err) { + console.error(err.stack); + } + } + } + _self.away = false; + _self.connected = true; + if (options.fireConnect !== false) { + try { + _self.emit("connect", _self); + } catch(err) { + console.error(err.stack); + } + } + else if (options.reconnectAttempt > 0) { + if (_self.buffer) { + _self.buffer.forEach(function(message) { + _self.transport.send(message); + }); + _self.buffer = []; + } + + try { + _self.emit("back"); + } catch(err) { + console.error(err.stack); + } + } + options.reconnectAttempt = 0; + } else { + try { + _self.emit("message", message); + } catch(err) { + console.error(err.stack); + } + } + }); + + _self.transport.on("disconnect", ondisconnect); + _self.transport.on("error", ondisconnect); + var once = false; + function ondisconnect(reason) { + // Ignore probe errors + if (/probe error/i.test(reason)) return; + + // Only one try to reconnect + if (once) return; + once = true; + + if (_self.debug) { + log("Disconnect socket: " + reason); + } + + if (_self.connected) { + _self.away = Date.now(); + + var now = Date.now(); + _self.sbuffer.forEach(function(iter){ + if (now - iter[1] < THRESHOLD) { + _self.buffer.push(iter[0]); + } + }); + _self.sbuffer = []; + + try { + _self.emit("away"); + } catch(err) { + console.error(err.stack); + } + } + + reconnect(); + }; + callback(null, _self); + }); + + } catch(err) { + callback(err); + } + } + Transport.prototype.send = function(message) { + if (this.connected === false) { + var err = new Error("Cannot send smith.io message while disconnected! Sender should respect connect/disconnect states!"); + // We log error here in case sender does not catch. + console.log(err.stack); + throw err; + } + + if (message.length) { + message.push(++this.sequence); + if (this.sequence > 30000) + this.sequence = 0; + } + + if (this.away) { + this.buffer.push(message); + } + else { + // Clear Existing Buffer > THRESHOLD + var now = Date.now(); + var items = this.sbuffer; + for (var i = items.length - 1; i >= 0; i--) { + if (now - items[i][1] > THRESHOLD) + items.splice(i, 1); + } + + // Add item to buffer + items.push([message, Date.now()]); + + // Send message + this.transport.send(message); + } + } + + exports.connect = function(options, callback) { + var transport = new Transport(options, callback); + transports.push(transport); + if (debugHandler) { + debugHandler.hookTransport(transport); + } + if (transport.debug) { + console.log(getLogTimestamp() + "[smith.io:" + transport.getUri() + "] New transport", options); + } + transport.connect({}, callback); + return transport; + } + + exports.setDebug = function(debug, events) { + if (debugHandler !== null) { + debugHandler.stop(); + if (window.localStorage) { + localStorage.smithioDebug = ""; + localStorage.debug = ""; + } + } + if (!debug) return; + events = events || []; + if (window.localStorage) { + localStorage.smithioDebug = JSON.stringify([debug, events]); + } + debugHandler = { + transports: [], + handlers: [], + start: function() { + transports.forEach(debugHandler.hookTransport); + }, + stop: function() { + transports.forEach(debugHandler.unhookTransport); + debugHandler = null; + }, + hookTransport: function(transport) { + var index = debugHandler.transports.indexOf(transport); + if (index !== -1) return; + + function log(message) { + console.log(getLogTimestamp() + "[smith.io:" + transport.connectIndex + ":" + transport.getUri() + "] " + message); + } + + log("Hook debugger"); + + var listeners = {}; + + transport.debug = true; + + transport.on("connect", listeners["connect"] = function() { + log("Connect"); + }); + transport.on("reconnect", listeners["reconnect"] = function(attempt) { + log("Reconnect: " + attempt); + }); + transport.on("disconnect", listeners["disconnect"] = function(reason) { + log("Disconnect: " + reason); + }); + transport.on("heartbeat", listeners["heartbeat"] = function(message) { + log("Heartbeat"); + }); + if (events.indexOf("message") !== -1) { + transport.on("message", listeners["message"] = function(message) { + log("Message", message); + }); + } + transport.on("away", listeners["away"] = function() { + log("Away"); + }); + transport.on("back", listeners["back"] = function() { + log("Back"); + }); + + if (events.indexOf("engine.io") !== -1) { + if (window.localStorage) { + localStorage.debug = "*"; + } + } + + debugHandler.transports.push(transport); + debugHandler.handlers.push({ + unhook: function() { + log("Unhook debugger"); + transport.debug = false; + for (var type in listeners) { + transport.removeListener(type, listeners[type]); + } + } + }); + }, + unhookTransport: function(transport) { + var index = debugHandler.transports.indexOf(transport); + if (index === -1) return; + debugHandler.transports.splice(index, 1); + debugHandler.handlers[index].unhook(); + debugHandler.handlers.splice(index, 1); + } + }; + debugHandler.start(); + } + + if (window.localStorage && localStorage.smithioDebug) { + exports.setDebug.apply(null, JSON.parse(localStorage.smithioDebug)); + } }); From e20bccc7fa94d5ee5933a25e0060d7f697abe9bc Mon Sep 17 00:00:00 2001 From: Bas Date: Thu, 4 Jul 2013 12:30:52 +0000 Subject: [PATCH 7/7] fixed crash (message obj undefined) --- server-plugin/plugin.js | 2 +- server-plugin/www/client.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index 0c044b7..0fe954f 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -109,7 +109,7 @@ module.exports = function startup(options, imports, register) { once: connections[id].ee.once.bind(connections[id].ee), send: function(message) { // Sequence number used to catch duplicates - if (message.length) { + if (message.push) { message.push(++connections[id].sequence); if (connections[id].sequence > 30000) connections[id].sequence = 0; diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index 9103095..b76e831 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -380,7 +380,7 @@ define(function(require, exports, module) { throw err; } - if (message.length) { + if (message.push) { message.push(++this.sequence); if (this.sequence > 30000) this.sequence = 0;