Skip to content

Commit

Permalink
Merge pull request #147 from nats-io/tls-reconnect-leaks
Browse files Browse the repository at this point in the history
- [FIX] #145 - when using TLS reconnects would fail because of listener leaks (events firing on leaked streams)

- [FIX] node client fired error on any protocol error. Some protocol errors such as 'stale connection' should have initiated a reconnect.

- [FIX] if a client attempted to publish/subscribe to a subject for which it had no permission, an error was generated. New event ('permission_error') will now be sent to the client.
  • Loading branch information
Alberto Ricart authored May 16, 2017
2 parents 25cddf6 + be8fa84 commit 43604b2
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 139 deletions.
52 changes: 45 additions & 7 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var net = require('net'),
* Constants
*/

var VERSION = '0.7.18',
var VERSION = '0.7.20',

DEFAULT_PORT = 4222,
DEFAULT_PRE = 'nats://localhost:',
Expand Down Expand Up @@ -91,6 +91,8 @@ var VERSION = '0.7.18',
NATS_PROTOCOL_ERR = 'NATS_PROTOCOL_ERR',
REQ_TIMEOUT = 'REQ_TIMEOUT',
REQ_TIMEOUT_MSG_PREFIX = 'The request timed out for subscription id: ',
STALE_CONNECTION_ERR = "stale connection",
PERMISSIONS_ERR = "permissions violation",

// Pedantic Mode support
//Q_SUB = /^([^\.\*>\s]+|>$|\*)(\.([^\.\*>\s]+|>$|\*))*$/, // TODO: remove / never used
Expand Down Expand Up @@ -415,7 +417,7 @@ Client.prototype.setupHandlers = function() {
});

stream.on('close', function(hadError) {
client.closeStream();
client.closeStream();
client.emit('disconnect');
if (client.closed === true ||
client.options.reconnect === false ||
Expand All @@ -427,7 +429,7 @@ Client.prototype.setupHandlers = function() {
});

stream.on('error', function(exception) {
// If we were connected just return, close event will process
// If we were connected just return, close event will process
if (client.wasConnected === true && client.currentServer.didConnect === true) {
return;
}
Expand Down Expand Up @@ -552,8 +554,14 @@ Client.prototype.createConnection = function() {

// Select a server to connect to.
this.selectServer();
// Create the stream.
this.stream = net.createConnection(this.url.port, this.url.hostname);
// See #45 if we have a stream release the listeners
// otherwise in addition to the leak events will fire fire
if(this.stream) {
this.stream.removeAllListeners();
this.stream.end();
}
// Create the stream
this.stream = net.createConnection(this.url.port, this.url.hostname);
// Setup the proper handlers.
this.setupHandlers();
};
Expand Down Expand Up @@ -603,7 +611,7 @@ Client.prototype.closeStream = function() {
if (this.stream !== null) {
this.stream.end();
this.stream.destroy();
this.stream = null;
this.stream = null;
}
if (this.connected === true || this.closed === true) {
this.pongs = null;
Expand Down Expand Up @@ -756,6 +764,11 @@ Client.prototype.processInbound = function() {
// For optional yield
var start;

if(! client.stream) {
// if we are here, the stream was reaped and errors raised
// if we continue.
return;
}
// unpause if needed.
// FIXME(dlc) client.stream.isPaused() causes 0.10 to fail
client.stream.resume();
Expand Down Expand Up @@ -785,7 +798,8 @@ Client.prototype.processInbound = function() {
} else if ((m = OK.exec(buf)) !== null) {
// Ignore for now..
} else if ((m = ERR.exec(buf)) !== null) {
client.emit('error', new NatsError(m[1], NATS_PROTOCOL_ERR));
client.processErr(m[1]);
return;
} else if ((m = PONG.exec(buf)) !== null) {
var cb = client.pongs && client.pongs.shift();
if (cb) { cb(); } // FIXME: Should we check for exceptions?
Expand Down Expand Up @@ -835,6 +849,11 @@ Client.prototype.processInbound = function() {
tlsOpts[key] = client.options.tls[key];
}
}
// if we have a stream, this is from an old connection, reap it
if(client.stream) {
client.stream.removeAllListeners();
client.stream.end();
}
client.stream = tls.connect(tlsOpts, function() {
client.flushPending();
});
Expand Down Expand Up @@ -986,6 +1005,25 @@ Client.prototype.processMsg = function() {
}
};

/**
* ProcessErr processes any error messages from the server
*
* @api private
*/
Client.prototype.processErr = function(s) {
// current NATS clients, will raise an error and close on any errors
// except stale connection and permission errors
var m = s ? s.toLowerCase() : '';
if(m.indexOf(STALE_CONNECTION_ERR) !== -1) {
this.scheduleReconnect();
} else if(m.indexOf(PERMISSIONS_ERR) !== -1) {
this.emit('permission_error', new NatsError(s, NATS_PROTOCOL_ERR));
} else {
this.emit('error', new NatsError(s, NATS_PROTOCOL_ERR));
this.closeStream();
}
};

/**
* Push a new cluster server.
*
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nats",
"version": "0.7.18",
"version": "0.7.20",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",
"keywords": [
"nats",
Expand Down
Loading

0 comments on commit 43604b2

Please sign in to comment.