Skip to content

Commit

Permalink
Merge pull request #143 from nats-io/discovered-servers
Browse files Browse the repository at this point in the history
FIX #103 - Implementing discovered servers API
  • Loading branch information
Alberto Ricart authored May 5, 2017
2 parents 6eba3ea + 6fd8397 commit 25cddf6
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 7 deletions.
33 changes: 30 additions & 3 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var net = require('net'),
* Constants
*/

var VERSION = '0.7.16',
var VERSION = '0.7.18',

DEFAULT_PORT = 4222,
DEFAULT_PRE = 'nats://localhost:',
Expand Down Expand Up @@ -482,7 +482,8 @@ Client.prototype.sendConnect = function() {
'lang' : 'node',
'version' : VERSION,
'verbose' : this.options.verbose,
'pedantic': this.options.pedantic
'pedantic': this.options.pedantic,
'protocol': 1,
};
if (this.user !== undefined) {
cs.user = this.user;
Expand All @@ -494,7 +495,6 @@ Client.prototype.sendConnect = function() {
if (this.options.name !== undefined) {
cs.name = this.options.name;
}

// If we enqueued requests before we received INFO from the server, or we
// reconnected, there be other data pending, write this immediately instead
// of adding it to the queue.
Expand Down Expand Up @@ -797,6 +797,33 @@ Client.prototype.processInbound = function() {
if (client.checkTLSMismatch() === true) {
return;
}

// Always try to read the connect_urls from info
if(client.info.connect_urls && client.info.connect_urls.length > 0) {
// don't add duplicates
var known = [];
client.servers.forEach(function(server) {
known.push(server.url.href);
});
// add new ones
var toAdd = [];
client.info.connect_urls.forEach(function(server) {
var u = 'nats://' + server;
if(known.indexOf(u) === -1) {
toAdd.push(new Server(url.parse(u)));
}
});

if(toAdd.length > 0) {
if(client.options.noRandomize !== true) {
shuffle(toAdd);
}
toAdd.forEach(function(s) {
client.servers.push(s);
});
}
}

// Process first INFO
if (client.infoReceived === false) {
// Switch over to TLS as needed.
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nats",
"version": "0.7.16",
"version": "0.7.18",
"description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system",
"keywords": [
"nats",
Expand Down
143 changes: 143 additions & 0 deletions test/dyncluster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/* jslint node: true */
/* global describe: false, before: false, after: false, it: false, afterEach: false, beforeEach: false */
/* jshint -W030 */
'use strict';

var NATS = require ('../'),
nsc = require('./support/nats_server_control'),
should = require('should');

describe('Dynamic Cluster - Connect URLs', function() {
this.timeout(10000);

// this to enable per test cleanup
var servers;
// Shutdown our servers
afterEach(function() {
nsc.stop_cluster(servers);
servers = [];
});

it('adding cluster performs update', function(done) {
var route_port = 54220;
var port = 54221;

// start a new cluster with single server
servers = nsc.start_cluster([port], route_port, function() {
should(servers.length).be.equal(1);

// connect the client
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100});
nc.on('connect', function () {
// start adding servers
process.nextTick(function () {
var others = nsc.add_member_with_delay([port + 1, port + 2], route_port, 250, function () {
// verify that 2 servers were added
should(others.length).be.equal(2);
others.forEach(function (o) {
// add them so they can be reaped
servers.push(o);
});
// give some time for the server to send infos
setTimeout(function () {
// we should know of 3 servers - the one we connected and the 2 we added
should(nc.servers.length).be.equal(3);
done();
}, 1000);
});
});
});
});
});

it('added servers are shuffled at the end of the list', function(done) {
var route_port = 54320;
var port = 54321;
// start a cluster of one server
var ports = [];
for (var i = 0; i < 10; i++) {
ports.push(port + i);
}
var map = {};
servers = nsc.start_cluster(ports, route_port, function () {
should(servers.length).be.equal(10);

var connectCount = 0;
function connectAndRecordPorts(check) {
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100});
nc.on('connect', function () {
var have = [];
nc.servers.forEach(function (s) {
have.push(s.url.port);
});

connectCount++;
should.ok(have[0] == port);
var key = have.join("_");
map[key] = map[key] ? map[key] + 1 : 1;
nc.close();
if (connectCount === 10) {
check();
}
});
}

// we should have more than one property if there was randomization
function check() {
var keys = Object.getOwnPropertyNames(map);
should.ok(keys.length > 1);
done();
}
// connect several times...
for (var i = 0; i < 10; i++) {
connectAndRecordPorts(check);
}
});
});

it('added servers not shuffled when noRandomize is set', function(done) {
var route_port = 54320;
var port = 54321;
// start a cluster of one server
var ports = [];
for (var i = 0; i < 10; i++) {
ports.push(port + i);
}
var map = {};
servers = nsc.start_cluster(ports, route_port, function () {
should(servers.length).be.equal(10);

var connectCount = 0;
function connectAndRecordPorts(check) {
var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100, 'noRandomize': true});
nc.on('connect', function () {
var have = [];
nc.servers.forEach(function (s) {
have.push(s.url.port);
});

connectCount++;
should.ok(have[0] == port);
var key = have.join("_");
map[key] = map[key] ? map[key] + 1 : 1;
nc.close();
if (connectCount === 10) {
check();
}
});
}

// we should have more than one property if there was randomization
function check() {
var keys = Object.getOwnPropertyNames(map);
should.ok(keys.length === 1);
should.ok(map[keys[0]] === 10);
done();
}
// connect several times...
for (var i = 0; i < 10; i++) {
connectAndRecordPorts(check);
}
});
});
});
85 changes: 82 additions & 3 deletions test/support/nats_server_control.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var net = require('net');
var SERVER = (process.env.TRAVIS) ? 'gnatsd/gnatsd' : 'gnatsd';
var DEFAULT_PORT = 4222;

exports.start_server = function(port, opt_flags, done) {
function start_server(port, opt_flags, done) {
if (!port) {
port = DEFAULT_PORT;
}
Expand Down Expand Up @@ -94,10 +94,89 @@ exports.start_server = function(port, opt_flags, done) {
});

return server;
};
}

exports.start_server = start_server;

exports.stop_server = function(server) {
function stop_server(server) {
if (server !== undefined) {
server.kill();
}
}

exports.stop_server = stop_server;

// starts a number of servers in a cluster at the specified ports.
// must call with at least one port.
function start_cluster(ports, route_port, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
var servers = [];
var started = 0;
var server = add_member(ports[0], route_port, route_port, function() {
started++;
servers.push(server);
if(started === ports.length) {
done();
}
});

var others = ports.slice(1);
others.forEach(function(p){
var s = add_member(p, route_port, p+1000, opt_flags, function() {
started++;
servers.push(s);
if(started === ports.length) {
done();
}
});
});
return servers;
}

// adds more cluster members, if more than one server is added additional
// servers are added after the specified delay.
function add_member_with_delay(ports, route_port, delay, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
var servers = [];
ports.forEach(function(p, i) {
setTimeout(function() {
var s = add_member(p, route_port, p+1000, opt_flags, function() {
servers.push(s);
if(servers.length === ports.length) {
done();
}
});
}, i*delay);
});

return servers;
}
exports.add_member_with_delay = add_member_with_delay;

function add_member(port, route_port, cluster_port, opt_flags, done) {
if (typeof opt_flags == 'function') {
done = opt_flags;
opt_flags = null;
}
opt_flags = opt_flags || [];
var opts = JSON.parse(JSON.stringify(opt_flags));
opts.push('--routes', 'nats://localhost:' + route_port);
opts.push('--cluster', 'nats://localhost:' + cluster_port);
return start_server(port, opts, done);
}

exports.start_cluster = start_cluster;
exports.add_member = add_member;

exports.stop_cluster = function(servers) {
servers.forEach(function(s) {
stop_server(s);
});
};

0 comments on commit 25cddf6

Please sign in to comment.