From 6fd839780c7f9853e461418cf944ad9b216f7178 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 3 May 2017 19:24:34 -0500 Subject: [PATCH] FIX #103 - Implementing discovered servers api. Added test Extended nats_server_control to allow easy creation of a cluster Bumped versions --- lib/nats.js | 33 ++++++- package.json | 2 +- test/dyncluster.js | 143 ++++++++++++++++++++++++++++ test/support/nats_server_control.js | 85 ++++++++++++++++- 4 files changed, 256 insertions(+), 7 deletions(-) create mode 100644 test/dyncluster.js diff --git a/lib/nats.js b/lib/nats.js index 5eae5d25..e52c00bb 100644 --- a/lib/nats.js +++ b/lib/nats.js @@ -23,7 +23,7 @@ var net = require('net'), * Constants */ -var VERSION = '0.7.16', +var VERSION = '0.7.18', DEFAULT_PORT = 4222, DEFAULT_PRE = 'nats://localhost:', @@ -482,7 +482,8 @@ Client.prototype.sendConnect = function() { 'lang' : 'node', 'version' : VERSION, 'verbose' : this.options.verbose, - 'pedantic': this.options.pedantic + 'pedantic': this.options.pedantic, + 'protocol': 1, }; if (this.user !== undefined) { cs.user = this.user; @@ -494,7 +495,6 @@ Client.prototype.sendConnect = function() { if (this.options.name !== undefined) { cs.name = this.options.name; } - // If we enqueued requests before we received INFO from the server, or we // reconnected, there be other data pending, write this immediately instead // of adding it to the queue. @@ -797,6 +797,33 @@ Client.prototype.processInbound = function() { if (client.checkTLSMismatch() === true) { return; } + + // Always try to read the connect_urls from info + if(client.info.connect_urls && client.info.connect_urls.length > 0) { + // don't add duplicates + var known = []; + client.servers.forEach(function(server) { + known.push(server.url.href); + }); + // add new ones + var toAdd = []; + client.info.connect_urls.forEach(function(server) { + var u = 'nats://' + server; + if(known.indexOf(u) === -1) { + toAdd.push(new Server(url.parse(u))); + } + }); + + if(toAdd.length > 0) { + if(client.options.noRandomize !== true) { + shuffle(toAdd); + } + toAdd.forEach(function(s) { + client.servers.push(s); + }); + } + } + // Process first INFO if (client.infoReceived === false) { // Switch over to TLS as needed. diff --git a/package.json b/package.json index f3d355a5..d1e648d9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nats", - "version": "0.7.16", + "version": "0.7.18", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", diff --git a/test/dyncluster.js b/test/dyncluster.js new file mode 100644 index 00000000..3bee44cd --- /dev/null +++ b/test/dyncluster.js @@ -0,0 +1,143 @@ +/* jslint node: true */ +/* global describe: false, before: false, after: false, it: false, afterEach: false, beforeEach: false */ +/* jshint -W030 */ +'use strict'; + +var NATS = require ('../'), + nsc = require('./support/nats_server_control'), + should = require('should'); + +describe('Dynamic Cluster - Connect URLs', function() { + this.timeout(10000); + + // this to enable per test cleanup + var servers; + // Shutdown our servers + afterEach(function() { + nsc.stop_cluster(servers); + servers = []; + }); + + it('adding cluster performs update', function(done) { + var route_port = 54220; + var port = 54221; + + // start a new cluster with single server + servers = nsc.start_cluster([port], route_port, function() { + should(servers.length).be.equal(1); + + // connect the client + var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100}); + nc.on('connect', function () { + // start adding servers + process.nextTick(function () { + var others = nsc.add_member_with_delay([port + 1, port + 2], route_port, 250, function () { + // verify that 2 servers were added + should(others.length).be.equal(2); + others.forEach(function (o) { + // add them so they can be reaped + servers.push(o); + }); + // give some time for the server to send infos + setTimeout(function () { + // we should know of 3 servers - the one we connected and the 2 we added + should(nc.servers.length).be.equal(3); + done(); + }, 1000); + }); + }); + }); + }); + }); + + it('added servers are shuffled at the end of the list', function(done) { + var route_port = 54320; + var port = 54321; + // start a cluster of one server + var ports = []; + for (var i = 0; i < 10; i++) { + ports.push(port + i); + } + var map = {}; + servers = nsc.start_cluster(ports, route_port, function () { + should(servers.length).be.equal(10); + + var connectCount = 0; + function connectAndRecordPorts(check) { + var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100}); + nc.on('connect', function () { + var have = []; + nc.servers.forEach(function (s) { + have.push(s.url.port); + }); + + connectCount++; + should.ok(have[0] == port); + var key = have.join("_"); + map[key] = map[key] ? map[key] + 1 : 1; + nc.close(); + if (connectCount === 10) { + check(); + } + }); + } + + // we should have more than one property if there was randomization + function check() { + var keys = Object.getOwnPropertyNames(map); + should.ok(keys.length > 1); + done(); + } + // connect several times... + for (var i = 0; i < 10; i++) { + connectAndRecordPorts(check); + } + }); + }); + + it('added servers not shuffled when noRandomize is set', function(done) { + var route_port = 54320; + var port = 54321; + // start a cluster of one server + var ports = []; + for (var i = 0; i < 10; i++) { + ports.push(port + i); + } + var map = {}; + servers = nsc.start_cluster(ports, route_port, function () { + should(servers.length).be.equal(10); + + var connectCount = 0; + function connectAndRecordPorts(check) { + var nc = NATS.connect({'port': port, 'reconnectTimeWait': 100, 'noRandomize': true}); + nc.on('connect', function () { + var have = []; + nc.servers.forEach(function (s) { + have.push(s.url.port); + }); + + connectCount++; + should.ok(have[0] == port); + var key = have.join("_"); + map[key] = map[key] ? map[key] + 1 : 1; + nc.close(); + if (connectCount === 10) { + check(); + } + }); + } + + // we should have more than one property if there was randomization + function check() { + var keys = Object.getOwnPropertyNames(map); + should.ok(keys.length === 1); + should.ok(map[keys[0]] === 10); + done(); + } + // connect several times... + for (var i = 0; i < 10; i++) { + connectAndRecordPorts(check); + } + }); + }); +}); diff --git a/test/support/nats_server_control.js b/test/support/nats_server_control.js index 92d66fdf..20b847d4 100644 --- a/test/support/nats_server_control.js +++ b/test/support/nats_server_control.js @@ -7,7 +7,7 @@ var net = require('net'); var SERVER = (process.env.TRAVIS) ? 'gnatsd/gnatsd' : 'gnatsd'; var DEFAULT_PORT = 4222; -exports.start_server = function(port, opt_flags, done) { +function start_server(port, opt_flags, done) { if (!port) { port = DEFAULT_PORT; } @@ -94,10 +94,89 @@ exports.start_server = function(port, opt_flags, done) { }); return server; -}; +} + +exports.start_server = start_server; -exports.stop_server = function(server) { +function stop_server(server) { if (server !== undefined) { server.kill(); } +} + +exports.stop_server = stop_server; + +// starts a number of servers in a cluster at the specified ports. +// must call with at least one port. +function start_cluster(ports, route_port, opt_flags, done) { + if (typeof opt_flags == 'function') { + done = opt_flags; + opt_flags = null; + } + var servers = []; + var started = 0; + var server = add_member(ports[0], route_port, route_port, function() { + started++; + servers.push(server); + if(started === ports.length) { + done(); + } + }); + + var others = ports.slice(1); + others.forEach(function(p){ + var s = add_member(p, route_port, p+1000, opt_flags, function() { + started++; + servers.push(s); + if(started === ports.length) { + done(); + } + }); + }); + return servers; +} + +// adds more cluster members, if more than one server is added additional +// servers are added after the specified delay. +function add_member_with_delay(ports, route_port, delay, opt_flags, done) { + if (typeof opt_flags == 'function') { + done = opt_flags; + opt_flags = null; + } + var servers = []; + ports.forEach(function(p, i) { + setTimeout(function() { + var s = add_member(p, route_port, p+1000, opt_flags, function() { + servers.push(s); + if(servers.length === ports.length) { + done(); + } + }); + }, i*delay); + }); + + return servers; +} +exports.add_member_with_delay = add_member_with_delay; + +function add_member(port, route_port, cluster_port, opt_flags, done) { + if (typeof opt_flags == 'function') { + done = opt_flags; + opt_flags = null; + } + opt_flags = opt_flags || []; + var opts = JSON.parse(JSON.stringify(opt_flags)); + opts.push('--routes', 'nats://localhost:' + route_port); + opts.push('--cluster', 'nats://localhost:' + cluster_port); + return start_server(port, opts, done); +} + +exports.start_cluster = start_cluster; +exports.add_member = add_member; + +exports.stop_cluster = function(servers) { + servers.forEach(function(s) { + stop_server(s); + }); }; +