diff --git a/CHANGELOG.md b/CHANGELOG.md index d652754..c242f93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Change Log +## 19.0.0 +- I am not aware of any breaking changes in this release, but emitting error events asynchronously could have subtle side effects, hence the major release +- Deprecate session 'cancelled' event in favour of 'cancel' (both will work) +- Refactor reconnection and resubscription code +- Emit errors asynchronously to prevent them being caught by the amqplib main accept loop +- Fix bug which throw an exception in the error handler when a close event was emitted with no error argument + ## 18.0.1 - Removed console.log when the channel pool destroyed a channel diff --git a/README.md b/README.md index e2fc677..71709ce 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,9 @@ The reason Rascal nacks the message is because the alternatives are to leave the ## Very Important Section About Event Handling -[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this +[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering. + +There are four places where you need to register error handlers. 1. Immediately after obtaining a broker instance @@ -1358,7 +1360,7 @@ If the message has not been auto-acknowledged you should ackOrNack it. **If you The RabbitMQ broker may [cancel](https://www.rabbitmq.com/consumer-cancel.html) the consumer if the queue is deleted or the node on which the queue is located fails. [amqplib](https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) handles this by delivering a `null` message. When Rascal receives the null message it will -1. Emit a `cancelled` event from the subscription. +1. Emit a `cancel` event from the subscription. 1. Emit an `error` event from the subscription if the `cancel` event was not handled 1. Optionally attempt to resubscribe as per normal retry configuration. If the queue was deleted rather than being failed over, the queue will not automatically be re-created and retry attempts will fail indefinitely. diff --git a/lib/amqp/Publication.js b/lib/amqp/Publication.js index 133da3b..3a40575 100644 --- a/lib/amqp/Publication.js +++ b/lib/amqp/Publication.js @@ -95,24 +95,27 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn, session._removePausedListener(); if (err) return session.emit('error', err, messageId); if (session.isAborted()) return abortPublish(channel, messageId); - const errorHandler = _.once(handleChannelError.bind(null, channel, messageId, session, config)); + + const disconnectionHandler = makeDisconnectionHandler(channel, messageId, session, config); const returnHandler = session.emit.bind(session, 'return'); - addListeners(channel, errorHandler, returnHandler); + addListeners(channel, disconnectionHandler, returnHandler); + try { session._startPublish(); + publishFn(channel, buffer, publishConfig, (err, ok) => { session._endPublish(); if (err) { - destroyChannel(channel, errorHandler, returnHandler); + destroyChannel(channel, disconnectionHandler, returnHandler); return session.emit('error', err, messageId); } - ok ? returnChannel(channel, errorHandler, returnHandler) : deferReturnChannel(channel, errorHandler, returnHandler); + ok ? returnChannel(channel, disconnectionHandler, returnHandler) : deferReturnChannel(channel, disconnectionHandler, returnHandler); session.emit('success', messageId); }); } catch (err) { - returnChannel(channel, errorHandler, returnHandler); + returnChannel(channel, disconnectionHandler, returnHandler); return session.emit('error', err, messageId); } }); @@ -125,19 +128,19 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn, returnChannelFn(channel); } - function returnChannel(channel, errorHandler, returnHandler) { - removeListeners(channel, errorHandler, returnHandler); + function returnChannel(channel, disconnectionHandler, returnHandler) { + removeListeners(channel, disconnectionHandler, returnHandler); returnChannelFn(channel); } - function deferReturnChannel(channel, errorHandler, returnHandler) { + function deferReturnChannel(channel, disconnectionHandler, returnHandler) { channel.once('drain', () => { - returnChannel(channel, errorHandler, returnHandler); + returnChannel(channel, disconnectionHandler, returnHandler); }); } - function destroyChannel(channel, errorHandler, returnHandler) { - removeListeners(channel, errorHandler, returnHandler); + function destroyChannel(channel, disconnectionHandler, returnHandler) { + removeListeners(channel, disconnectionHandler, returnHandler); destroyChannelFn(channel); } @@ -163,19 +166,29 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn, } } -function addListeners(channel, errorHandler, returnHandler) { - channel.on('error', errorHandler); +function makeDisconnectionHandler(channel, messageId, session, config) { + return _.once((err) => { + // Use setImmediate to avoid amqplib accept loop swallowing errors + setImmediate(() => (err + // Treat close events with errors as error events + ? handleChannelError(channel, messageId, session, config, err) + : handleChannelClose(channel, messageId, session, config))); + }); +} + +function addListeners(channel, disconnectionHandler, returnHandler) { + channel.on('error', disconnectionHandler); channel.on('return', returnHandler); - channel.connection.once('error', errorHandler); - channel.connection.once('close', errorHandler); + channel.connection.once('error', disconnectionHandler); + channel.connection.once('close', disconnectionHandler); } -function removeListeners(channel, errorHandler, returnHandler) { +function removeListeners(channel, disconnectionHandler, returnHandler) { channel.removeAllListeners('drain'); - channel.removeListener('error', errorHandler); + channel.removeListener('error', disconnectionHandler); channel.removeListener('return', returnHandler); - channel.connection.removeListener('error', errorHandler); - channel.connection.removeListener('close', errorHandler); + channel.connection.removeListener('error', disconnectionHandler); + channel.connection.removeListener('close', disconnectionHandler); } function publishToExchange(channel, content, config, next) { @@ -252,3 +265,8 @@ function handleChannelError(borked, messageId, emitter, config, err) { debug('Channel error: %s during publication of message: %s to %s using channel: %s', err.message, messageId, config.name, borked._rascal_id); emitter.emit('error', err, messageId); } + +function handleChannelClose(borked, messageId, emitter, config) { + debug('Channel closed during publication of message: %s to %s using channel: %s', messageId, config.name, borked._rascal_id); + emitter.emit('close', messageId); +} diff --git a/lib/amqp/Subscription.js b/lib/amqp/Subscription.js index ea3daab..03e6d73 100644 --- a/lib/amqp/Subscription.js +++ b/lib/amqp/Subscription.js @@ -61,13 +61,13 @@ function Subscription(broker, vhost, subscriptionConfig, counter) { _configureQos(config, channel, (err) => { if (err) return done(err); - const removeErrorHandlers = attachErrorHandlers(channel, session, config); - const onMessage = _onMessage.bind(null, session, config, removeErrorHandlers); + const removeDisconnectionHandlers = attachDisconnectionHandlers(channel, session, config); + const onMessage = _onMessage.bind(null, session, config, removeDisconnectionHandlers); channel.consume(config.source, onMessage, config.options, (err, response) => { if (err) { debug('Error subscribing to %s using channel: %s. %s', config.source, channel._rascal_id, err.message); - removeErrorHandlers(); + removeDisconnectionHandlers(); return done(err); } session._open(channel, response.consumerTag, (err) => { @@ -88,8 +88,8 @@ function Subscription(broker, vhost, subscriptionConfig, counter) { async.series(qos, next); } - function _onMessage(session, config, removeErrorHandlers, message) { - if (!message) return handleConsumerCancel(session, config, removeErrorHandlers); + function _onMessage(session, config, removeDisconnectionHandlers, message) { + if (!message) return handleConsumerCancel(session, config, removeDisconnectionHandlers); debug('Received message: %s from queue: %s', message.properties.messageId, config.queue); session._incrementUnacknowledgeMessageCount(message.fields.consumerTag); @@ -249,51 +249,66 @@ function Subscription(broker, vhost, subscriptionConfig, counter) { if (err) session.emit('error', err); } - function attachErrorHandlers(channel, session, config) { + function attachDisconnectionHandlers(channel, session, config) { /* eslint-disable no-use-before-define */ const connection = channel.connection; - const removeErrorHandlers = _.once(() => { - channel.removeListener('error', errorHandler); + const removeDisconnectionHandlers = _.once(() => { + channel.removeListener('error', disconnectionHandler); channel.on('error', (err) => { debug('Suppressing error on cancelled session: %s to prevent connection errors. %s', channel._rascal_id, err.message); }); - connection.removeListener('error', errorHandler); - connection.removeListener('close', errorHandler); + connection.removeListener('error', disconnectionHandler); + connection.removeListener('close', disconnectionHandler); + }); + + const disconnectionHandler = makeDisconnectionHandler(session, config, removeDisconnectionHandlers); + channel.on('error', disconnectionHandler); + connection.once('error', disconnectionHandler); + connection.once('close', disconnectionHandler); + return removeDisconnectionHandlers; + } + + function makeDisconnectionHandler(session, config, removeDisconnectionHandlers) { + return _.once((err) => { + // Use setImmediate to avoid amqplib accept loop swallowing errors + setImmediate(() => (err + // Treat close events with errors as error events + ? handleChannelError(session, config, removeDisconnectionHandlers, 0, err) + : handleChannelClose(session, config, removeDisconnectionHandlers, 0))); }); - const errorHandler = _.once(handleChannelError.bind(null, session, config, removeErrorHandlers, 0)); - channel.on('error', errorHandler); - connection.once('error', errorHandler); - connection.once('close', errorHandler); - return removeErrorHandlers; } - function handleChannelError(session, config, removeErrorHandlers, attempts, err) { + function handleChannelError(session, config, removeDisconnectionHandler, attempt, err) { debug('Handling channel error: %s from %s using channel: %s', err.message, config.name, session._getRascalChannelId()); - if (removeErrorHandlers) removeErrorHandlers(); + if (removeDisconnectionHandler) removeDisconnectionHandler(); session.emit('error', err); - config.retry - && subscribeNow(session, config, (err) => { - if (!err) return; - const delay = timer.next(); - debug('Will attempt resubscription(%d) to %s in %dms', attempts + 1, config.name, delay); - session._schedule(handleChannelError.bind(null, session, config, null, attempts + 1, err), delay); - }); + retrySubscription(session, config, attempt + 1); } - function handleConsumerCancel(session, config, removeErrorHandlers) { + function handleChannelClose(session, config, removeDisconnectionHandler, attempt) { + debug('Handling channel close from %s using channel: %s', config.name, session._getRascalChannelId()); + removeDisconnectionHandler(); + session.emit('close'); + retrySubscription(session, config, attempt + 1); + } + + function handleConsumerCancel(session, config, removeDisconnectionHandler) { debug('Received consumer cancel from %s using channel: %s', config.name, session._getRascalChannelId()); - removeErrorHandlers(); + removeDisconnectionHandler(); + const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name)); + session.emit('cancelled', cancelErr) || session.emit('cancel', cancelErr) || session.emit('error', cancelErr); session._close((err) => { if (err) debug('Error cancelling subscription: %s', err.message); - const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name)); - session.emit('cancelled', cancelErr) || session.emit('error', cancelErr); - config.retry - && subscribeNow(session, config, (err) => { - if (!err) return; - const delay = timer.next(); - debug('Will attempt resubscription(%d) to %s in %dms', 1, config.name, delay); - session._schedule(handleChannelError.bind(null, session, config, null, 1, err), delay); - }); + retrySubscription(session, config, 1); + }); + } + + function retrySubscription(session, config, attempt) { + config.retry && subscribeNow(session, config, (err) => { + if (!err) return; + const delay = timer.next(); + debug('Will attempt resubscription(%d) to %s in %dms', attempt, config.name, delay); + session._schedule(handleChannelError.bind(null, session, config, null, attempt, err), delay); }); } } diff --git a/lib/amqp/Vhost.js b/lib/amqp/Vhost.js index e1837ac..367f3b1 100644 --- a/lib/amqp/Vhost.js +++ b/lib/amqp/Vhost.js @@ -57,7 +57,7 @@ function Vhost(vhostConfig, components) { connectionConfig = ctx.connectionConfig; timer = backoff(ctx.connectionConfig.retry); - attachErrorHandlers(config); + attachDisconnectionHandlers(config); forwardRabbitMQConnectionEvents(); ensureChannelPools(); resumeChannelAllocation(); @@ -471,11 +471,21 @@ function Vhost(vhostConfig, components) { ); } - function attachErrorHandlers(config) { + function attachDisconnectionHandlers(config) { connection.removeAllListeners('error'); - const errorHandler = _.once(handleConnectionError.bind(null, connection, config)); - connection.on('error', errorHandler); - connection.on('close', errorHandler); + const disconectionHandler = makeDisconnectionHandler(config); + connection.on('error', disconectionHandler); + connection.on('close', disconectionHandler); + } + + function makeDisconnectionHandler(config) { + return _.once((err) => { + // Use setImmediate to avoid amqplib accept loop swallowing errors + setImmediate(() => (err + // Treat close events with errors as error events + ? handleConnectionError(connection, config, err) + : handleConnectionClose(connection, config))); + }); } function handleConnectionError(borked, config, err) { @@ -484,12 +494,24 @@ function Vhost(vhostConfig, components) { connection = undefined; self.emit('disconnect'); self.emit('error', err, self.getConnectionDetails()); - connectionConfig.retry - && self.init((err) => { - if (!err) return; - const delay = timer.next(); - debug('Will attempt reconnection in in %dms', delay); - reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay); - }); + retryConnection(borked, config); + } + + function handleConnectionClose(borked, config) { + debug('Handling connection close initially from connection: %s, %s', borked._rascal_id, connectionConfig.loggableUrl); + pauseChannelAllocation(); + connection = undefined; + self.emit('disconnect'); + self.emit('close', self.getConnectionDetails()); + retryConnection(borked, config); + } + + function retryConnection(borked, config) { + connectionConfig.retry && self.init((err) => { + if (!err) return; + const delay = timer.next(); + debug('Will attempt reconnection in in %dms', delay); + reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay); + }); } } diff --git a/test/subscriptions.tests.js b/test/subscriptions.tests.js index 3151fdc..25789ce 100644 --- a/test/subscriptions.tests.js +++ b/test/subscriptions.tests.js @@ -7,695 +7,729 @@ const testConfig = require('../lib/config/tests'); const Broker = require('..').Broker; const AmqpUtils = require('./utils/amqputils'); -describe( - 'Subscriptions', - () => { - let broker; - let amqputils; - let namespace; - let vhosts; - let publications; - let subscriptions; - - beforeEach((test, done) => { - namespace = uuid(); - vhosts = { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, - e2: { - assert: true, - }, - dlx: { - assert: true, - type: 'fanout', - }, - xx: { - assert: true, - }, +describe('Subscriptions', () => { + let broker; + let amqputils; + let namespace; + let vhosts; + let publications; + let subscriptions; + + beforeEach((test, done) => { + namespace = uuid(); + vhosts = { + '/': { + namespace, + exchanges: { + e1: { + assert: true, }, - queues: { - q1: { - assert: true, - options: { - arguments: { - 'x-dead-letter-exchange': 'dlx', - }, + e2: { + assert: true, + }, + dlx: { + assert: true, + type: 'fanout', + }, + xx: { + assert: true, + }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'dlx', }, }, - q2: { - assert: true, - }, - q3: { - assert: true, - }, - dlq: { - assert: true, - }, - 'q_10.10.10.10': { - assert: true, - }, }, - bindings: { - b1: { - source: 'e1', - destination: 'q1', - bindingKey: 'foo', - }, - b2: { - source: 'e2', - destination: 'q2', - bindingKey: 'bar', - }, - b3: { - source: 'e1', - destination: 'q3', - bindingKey: 'baz', - }, - b4: { - source: 'e1', - destination: 'q_10.10.10.10', - bindingKey: 'buz', - }, - b5: { - source: 'dlx', - destination: 'dlq', - }, + q2: { + assert: true, + }, + q3: { + assert: true, + }, + dlq: { + assert: true, + }, + 'q_10.10.10.10': { + assert: true, }, }, - }; - - publications = { - p1: { - vhost: '/', - exchange: 'e1', - routingKey: 'foo', - }, - p2: { - vhost: '/', - exchange: 'e2', - routingKey: 'bar', - }, - p3: { - vhost: '/', - exchange: 'xx', - }, - p4: { - vhost: '/', - exchange: 'e1', - routingKey: 'buz', - }, - }; - - subscriptions = { - s1: { - vhost: '/', - queue: 'q1', - }, - s2: { - vhost: '/', - queue: 'q2', - }, - s3: { - vhost: '/', - queue: 'q3', - }, - s4: { - vhost: '/', - queue: 'q1', - deprecated: true, - }, - s5: { - vhost: '/', - queue: 'q_10.10.10.10', + bindings: { + b1: { + source: 'e1', + destination: 'q1', + bindingKey: 'foo', + }, + b2: { + source: 'e2', + destination: 'q2', + bindingKey: 'bar', + }, + b3: { + source: 'e1', + destination: 'q3', + bindingKey: 'baz', + }, + b4: { + source: 'e1', + destination: 'q_10.10.10.10', + bindingKey: 'buz', + }, + b5: { + source: 'dlx', + destination: 'dlq', + }, }, - }; - - amqplib.connect((err, connection) => { - if (err) return done(err); - amqputils = AmqpUtils.init(connection); - done(); - }); - }); - - afterEach((test, done) => { - amqputils.disconnect(() => { - if (broker) return broker.nuke(done); - done(); - }); - }); + }, + }; + + publications = { + p1: { + vhost: '/', + exchange: 'e1', + routingKey: 'foo', + }, + p2: { + vhost: '/', + exchange: 'e2', + routingKey: 'bar', + }, + p3: { + vhost: '/', + exchange: 'xx', + }, + p4: { + vhost: '/', + exchange: 'e1', + routingKey: 'buz', + }, + }; + + subscriptions = { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + s3: { + vhost: '/', + queue: 'q3', + }, + s4: { + vhost: '/', + queue: 'q1', + deprecated: true, + }, + s5: { + vhost: '/', + queue: 'q_10.10.10.10', + }, + }; + + amqplib.connect((err, connection) => { + if (err) return done(err); + amqputils = AmqpUtils.init(connection); + done(); + }); + }); + + afterEach((test, done) => { + amqputils.disconnect(() => { + if (broker) return broker.nuke(done); + done(); + }); + }); + + it('should report unknown subscriptions', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('does-not-exist', (err) => { + assert.ok(err); + assert.strictEqual(err.message, 'Unknown subscription: does-not-exist'); + done(); + }); + }, + ); + }); - it('should report unknown subscriptions', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should consume to text/plain messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.subscribe('does-not-exist', (err) => { - assert.ok(err); - assert.strictEqual(err.message, 'Unknown subscription: does-not-exist'); - done(); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.contentType, 'text/plain'); + assert.strictEqual(content, 'test message'); + done(); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should consume to text/plain messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should report repeated calls to ackOrNack via callback if specified', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(message.properties.contentType, 'text/plain'); - assert.strictEqual(content, 'test message'); - done(); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + ackOrNack((err) => { + assert.strictEqual('ackOrNack should only be called once per message', err.message); }); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should report repeated calls to ackOrNack via callback if specified', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should report repeated calls to ackOrNack via error event if no callback specified', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - ackOrNack((err) => { - assert.strictEqual('ackOrNack should only be called once per message', err.message); - }); - done(); - }); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + ackOrNack(); + }); + subscription.on('error', (err) => { + assert.strictEqual('ackOrNack should only be called once per message', err.message); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should report repeated calls to ackOrNack via error event if no callback specified', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not consume messages before a listener is bound', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); + setTimeout(() => { subscription.on('message', (message, content, ackOrNack) => { ackOrNack(); - ackOrNack(); - }); - subscription.on('error', (err) => { - assert.strictEqual('ackOrNack should only be called once per message', err.message); + assert.strictEqual(content, 'test message'); done(); }); - }); - }); - }, - ); - }); - - it('should not consume messages before a listener is bound', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - setTimeout(() => { - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(content, 'test message'); - done(); - }); - }, 500); - }); + }, 500); }); - }, - ); - }); - - it('should consume to text/other messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish( - 'p1', - 'test message', - { - options: { - contentType: 'text/csv', - }, - }, - (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(message.properties.contentType, 'text/csv'); - assert.strictEqual(content.toString(), 'test message'); - done(); - }); - }); - }, - ); - }, - ); - }); + }); + }, + ); + }); - it('should consume to whatever/whatever messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish( - 'p1', - 'test message', - { - options: { - contentType: 'x-foo-bar/blah', - }, - }, - (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(message.properties.contentType, 'x-foo-bar/blah'); - assert.strictEqual(content.toString(), 'test message'); - done(); - }); - }); + it('should consume to text/other messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish( + 'p1', + 'test message', + { + options: { + contentType: 'text/csv', }, - ); - }, - ); - }); - - it('should consume to JSON messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', { message: 'test message' }, (err) => { + }, + (err) => { assert.ifError(err); broker.subscribe('s1', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { ackOrNack(); - assert.strictEqual(message.properties.contentType, 'application/json'); - assert.strictEqual(content.message, 'test message'); + assert.strictEqual(message.properties.contentType, 'text/csv'); + assert.strictEqual(content.toString(), 'test message'); done(); }); }); - }); - }, - ); - }); - - it('should consume to Buffer messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', Buffer.from('test message'), (err) => { + }, + ); + }, + ); + }); + + it('should consume to whatever/whatever messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish( + 'p1', + 'test message', + { + options: { + contentType: 'x-foo-bar/blah', + }, + }, + (err) => { assert.ifError(err); broker.subscribe('s1', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { ackOrNack(); - assert.strictEqual(message.properties.contentType, undefined); + assert.strictEqual(message.properties.contentType, 'x-foo-bar/blah'); assert.strictEqual(content.toString(), 'test message'); done(); }); }); - }); - }, - ); - }); - - it('should not consume invalid messages when no invalid content/message listener is bound', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + }, + ); + }, + ); + }); + + it('should consume to JSON messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', { message: 'test message' }, (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('error', () => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); - }); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.contentType, 'application/json'); + assert.strictEqual(content.message, 'test message'); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should not consume an invalid messages messages when a listener is bound to invalid_content', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should consume to Buffer messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', Buffer.from('test message'), (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_content', (err) => { - assert(err); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'not json', done); - }); - }); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.contentType, undefined); + assert.strictEqual(content.toString(), 'test message'); + done(); }); }); + }); + }, + ); + }); - broker.on('error', (err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); - }); - }, - ); - }); - - it('should not consume an invalid messages messages when a listener is bound to invalid_message', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not consume invalid messages when no invalid content/message listener is bound', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_message', (err) => { - assert(err); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'not json', done); - }); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('error', () => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); - }); - }); - - broker.on('error', (err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should consume an invalid message when a listener acks it', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not consume an invalid messages messages when a listener is bound to invalid_content', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_content', (err, message, ackOrNack) => { - assert(err); - ackOrNack(() => { - setTimeout(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_content', (err) => { + assert(err); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessage('q1', namespace, 'not json', done); + }); + }); + }); + }); + + broker.on('error', (err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + }); + }, + ); + }); + + it('should not consume an invalid messages messages when a listener is bound to invalid_message', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_message', (err) => { + assert(err); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessage('q1', namespace, 'not json', done); + }); + }); + }); + }); + + broker.on('error', (err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + }); + }, + ); + }); + + it('should consume an invalid message when a listener acks it', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_content', (err, message, ackOrNack) => { + assert(err); + ackOrNack(() => { + setTimeout(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); }); - }); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should consume an invalid message when a listener nacks it', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should consume an invalid message when a listener nacks it', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, Buffer.from('not json'), { routingKey: 'foo', contentType: 'application/json' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_content', (err, message, ackOrNack) => { - assert(err); - ackOrNack(err, () => { - setTimeout(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_content', (err, message, ackOrNack) => { + assert(err); + ackOrNack(err, () => { + setTimeout(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); }); - }); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should force the content type when specified', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - contentType: 'text/plain', - }, + it('should force the content type when specified', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + contentType: 'text/plain', }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', { message: 'test message' }, (err) => { assert.ifError(err); - broker.publish('p1', { message: 'test message' }, (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(message.properties.contentType, 'application/json'); - assert.strictEqual(content, '{"message":"test message"}'); - done(); - }); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.contentType, 'application/json'); + assert.strictEqual(content, '{"message":"test message"}'); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should filter subscriptions by routing key', (test, done) => { - createBroker( - { - vhosts, - publications: { - p1: { - vhost: '/', - exchange: 'e1', - routingKey: 'bar', - }, + it('should filter subscriptions by routing key', (test, done) => { + createBroker( + { + vhosts, + publications: { + p1: { + vhost: '/', + exchange: 'e1', + routingKey: 'bar', }, - subscriptions, }, - (err, broker) => { + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', () => { - assert.ok(false, 'Should not have received any messages'); - }); + subscription.on('message', () => { + assert.ok(false, 'Should not have received any messages'); }); - setTimeout(done, 500); }); - }, - ); - }); + setTimeout(done, 500); + }); + }, + ); + }); - it('should consume auto acknowledged messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - options: { - noAck: true, - }, + it('should consume auto acknowledged messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + options: { + noAck: true, }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message) => { - assert.ok(message); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message) => { + assert.ok(message); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should not consume unacknowledged messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not consume unacknowledged messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message) => { - assert.ok(message); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'test message', done); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message) => { + assert.ok(message); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessage('q1', namespace, 'test message', done); }); }); }); + }); - broker.on('error', (err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); - }); - }, - ); - }); + broker.on('error', (err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + }); + }, + ); + }); - it('should consume acknowledged messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should consume acknowledged messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + setTimeout(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); + }); + }, 100); + }); + }); + }); + }, + ); + }); + it('should consume all acknowledged messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { + assert.ifError(err); + let count = 0; broker.subscribe('s1', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { assert.ok(message); - ackOrNack(); + if (++count < 10) return; + ackOrNack(null, { all: true }); setTimeout(() => { broker.shutdown((err) => { assert.ifError(err); @@ -704,65 +738,66 @@ describe( }, 100); }); }); - }); - }, - ); - }); - - it('should consume all acknowledged messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + }, + ); + }, + ); + }); + + it('should consume rejected messages by default', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { - assert.ifError(err); - let count = 0; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - if (++count < 10) return; - ackOrNack(null, { all: true }); - setTimeout(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); - }, 100); + + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('reject')); + setTimeout(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); - }); - }, - ); - }, - ); - }); + }, 100); + }); + }); + }); + }, + ); + }); - it('should consume rejected messages by default', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + it('should consume all rejected messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { assert.ifError(err); broker.subscribe('s1', (err, subscription) => { assert.ifError(err); + let count = 0; subscription.on('message', (message, content, ackOrNack) => { assert.ok(message); - ackOrNack(new Error('reject')); + if (++count < 10) return; + ackOrNack(new Error('reject'), { strategy: 'nack', all: true }); setTimeout(() => { broker.shutdown((err) => { assert.ifError(err); @@ -771,1977 +806,1971 @@ describe( }, 100); }); }); - }); - }, - ); - }); - - it('should consume all rejected messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { - assert.ifError(err); - - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - let count = 0; - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - if (++count < 10) return; - ackOrNack(new Error('reject'), { strategy: 'nack', all: true }); - setTimeout(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); - }, 100); - }); - }); - }, - ); - }, - ); - }); - - it('should reject messages when requested', (test, done) => { - createBroker( - { - vhosts: { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, - e2: { - assert: true, - }, + }, + ); + }, + ); + }); + + it('should reject messages when requested', (test, done) => { + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, }, - queues: { - q1: { - assert: true, - options: { - arguments: { - 'x-dead-letter-exchange': 'e2', - }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', }, }, - q2: { - assert: true, - }, }, - bindings: { - b1: { - source: 'e1', - destination: 'q1', - bindingKey: 'foo', - }, - b2: { - source: 'e2', - destination: 'q2', - bindingKey: 'foo', - }, + q2: { + assert: true, }, }, - }, - publications: _.pick(publications, 'p1'), - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - }, - s2: { - vhost: '/', - queue: 'q2', + bindings: { + b1: { + source: 'e1', + destination: 'q1', + bindingKey: 'foo', + }, + b2: { + source: 'e2', + destination: 'q2', + bindingKey: 'foo', + }, }, }, }, - (err, broker) => { + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('reject')); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('reject')); }); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - done(); - }); + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should requeue messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should requeue messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', requeue: true }); - ackOrNack(); - done(); - }); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + if (messages[message.properties.messageId] < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', requeue: true }); + ackOrNack(); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should requeue messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should requeue messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + if (messages[message.properties.messageId] < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', requeue: true }); + ackOrNack(); + done(); + }); + }); + }); + }, + ); + }); + + it('should requeue all messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { assert.ifError(err); - const messages = {}; broker.subscribe('s1', (err, subscription) => { assert.ifError(err); + let count = 0; subscription.on('message', (message, content, ackOrNack) => { assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', requeue: true }); + if (++count < 10) return; + if (count === 10) return ackOrNack(new Error('reject'), { strategy: 'nack', all: true, requeue: true }); + + assert.ok(message.fields.redelivered); ackOrNack(); - done(); + if (count === 20) done(); }); }); - }); - }, - ); - }); - - it('should requeue all messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + }, + ); + }, + ); + }); + + it('should defer requeueing messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - let count = 0; - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - if (++count < 10) return; - if (count === 10) return ackOrNack(new Error('reject'), { strategy: 'nack', all: true, requeue: true }); + let numberOfMessages = 0; + const startTime = new Date().getTime(); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + numberOfMessages++; + if (numberOfMessages < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', defer: 100, requeue: true }); + ackOrNack(); + const stopTime = new Date().getTime(); + assert.ok(stopTime - startTime >= 900, 'Retry was not deferred'); + done(); + }); + }); + }); + }, + ); + }); - assert.ok(message.fields.redelivered); - ackOrNack(); - if (count === 20) done(); - }); - }); + it('should count redeliveries', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + redeliveries: { + counter: 'inMemory', }, - ); - }, - ); - }); - - it('should defer requeueing messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, + }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - let numberOfMessages = 0; - const startTime = new Date().getTime(); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - numberOfMessages++; - if (numberOfMessages < 10) return ackOrNack(new Error('retry'), { strategy: 'nack', defer: 100, requeue: true }); + let errors = 0; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', (message, content, ackOrNack) => { + if (message.properties.headers.rascal.redeliveries < 10) throw new Error('oh no'); ackOrNack(); - const stopTime = new Date().getTime(); - assert.ok(stopTime - startTime >= 900, 'Retry was not deferred'); - done(); + subscription.cancel(done); + }) + .on('error', () => { + if (errors++ > 10) done(new Error('Redeliveries were not counted')); }); - }); }); - }, - ); - }); + }); + }, + ); + }); - it('should count redeliveries', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - redeliveries: { - counter: 'inMemory', - }, + it('should notify when redeliveries limit is exceeded', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + redeliveries: { + limit: 5, + counter: 'inMemory', }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - let errors = 0; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - if (message.properties.headers.rascal.redeliveries < 10) throw new Error('oh no'); - ackOrNack(); - subscription.cancel(done); - }) - .on('error', () => { - if (errors++ > 10) done(new Error('Redeliveries were not counted')); + let errors = 0; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', () => { + throw new Error('oh no'); + }) + .on('redeliveries_exceeded', (err) => { + assert(err); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessage('q1', namespace, 'test message', done); }); - }); + }) + .on('error', () => { + if (errors++ > 5) done(new Error('Redeliveries were exceeded')); + }); }); - }, - ); - }); + }); - it('should notify when redeliveries limit is exceeded', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - redeliveries: { - limit: 5, - counter: 'inMemory', - }, + broker.on('error', (err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + }); + }, + ); + }); + + it('should notify when redeliveries error is exceeded', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + redeliveries: { + limit: 5, + counter: 'inMemory', }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - let errors = 0; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - throw new Error('oh no'); - }) - .on('redeliveries_exceeded', (err) => { - assert(err); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'test message', done); - }); - }) - .on('error', () => { - if (errors++ > 5) done(new Error('Redeliveries were exceeded')); + let errors = 0; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', () => { + throw new Error('oh no'); + }) + .on('redeliveries_error', (err) => { + assert(err); + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessage('q1', namespace, 'test message', done); }); - }); + }) + .on('error', () => { + if (errors++ > 5) done(new Error('Redeliveries were exceeded')); + }); }); broker.on('error', (err) => { assert.strictEqual(err.code, 'ETIMEDOUT'); }); - }, - ); - }); + }); + }, + ); + }); - it('should notify when redeliveries error is exceeded', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - redeliveries: { - limit: 5, - counter: 'inMemory', - }, + it('should consume a poison messages when no listener is bound', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + redeliveries: { + limit: 5, + counter: 'inMemory', }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - let errors = 0; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - throw new Error('oh no'); - }) - .on('redeliveries_error', (err) => { - assert(err); - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'test message', done); - }); - }) - .on('error', () => { - if (errors++ > 5) done(new Error('Redeliveries were exceeded')); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', () => { + throw new Error('oh no'); + }) + .on('error', (err) => { + if (!/Message .* has exceeded 5 redeliveries/.test(err.message)) return; + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); - }); - - broker.on('error', (err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); - }); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should consume a poison messages when no listener is bound', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - redeliveries: { - limit: 5, - counter: 'inMemory', - }, + it('should consume a poision message when a listener acks it', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + redeliveries: { + limit: 5, + counter: 'inMemory', }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - throw new Error('oh no'); - }) - .on('error', (err) => { - if (!/Message .* has exceeded 5 redeliveries/.test(err.message)) return; - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); + let errors = 0; + subscription + .on('message', () => { + throw new Error('oh no'); + }) + .on('redeliveries_exceeded', (err, message, ackOrNack) => { + assert(err); + ackOrNack(() => { + setTimeout(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); + }); }); }); - }); + }) + .on('error', () => { + if (errors++ > 5) done(new Error('Redeliveries were exceeded')); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should consume a poision message when a listener acks it', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - redeliveries: { - limit: 5, - counter: 'inMemory', - }, - }, - }, - }, - (err, broker) => { - assert.ifError(err); + it('should republish messages to queue when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - let errors = 0; - subscription - .on('message', () => { - throw new Error('oh no'); - }) - .on('redeliveries_exceeded', (err, message, ackOrNack) => { - assert(err); - ackOrNack(() => { - setTimeout(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); - }); - }); - }) - .on('error', () => { - if (errors++ > 5) done(new Error('Redeliveries were exceeded')); - }); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9); + assert.strictEqual(message.properties.headers.rascal.error.message, 'republish me'); + assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should republish messages to queue when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should republish messages with periods in the queue name', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p4', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9); - assert.strictEqual(message.properties.headers.rascal.error.message, 'republish me'); - assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); - done(); - }); + const messages = {}; + broker.subscribe('s5', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q_10.10.10.10')].republished, 9); + assert.strictEqual(message.properties.headers.rascal.error.message, 'republish me'); + assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should republish messages with periods in the queue name', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p4', 'test message', (err) => { - assert.ifError(err); + it('should truncate error messages when republishing', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { + assert.ifError(err); - const messages = {}; - broker.subscribe('s5', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 10) return ackOrNack({ message: 'republish me', code: 'red' }, { strategy: 'republish' }); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q_10.10.10.10')].republished, 9); - assert.strictEqual(message.properties.headers.rascal.error.message, 'republish me'); - assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); - done(); - }); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + if (messages[message.properties.messageId] < 10) return ackOrNack(new Error(_.pad('x', 10000, 'x')), { strategy: 'republish' }); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9); + assert.strictEqual(message.properties.headers.rascal.error.message.length, 1024); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should truncate error messages when republishing', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should maintain original fields, properties and headers when republished', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', { options: { persistent: true, headers: { foo: 'bar' } } }, (err, publication) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); + publication.on('success', (messageId) => { const messages = {}; broker.subscribe('s1', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { assert.ok(message); messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 10) return ackOrNack(new Error(_.pad('x', 10000, 'x')), { strategy: 'republish' }); + if (messages[message.properties.messageId] < 2) return ackOrNack(new Error('republish'), { strategy: 'republish' }); ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 9); - assert.strictEqual(message.properties.headers.rascal.error.message.length, 1024); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 1); + assert.strictEqual(message.properties.headers.foo, 'bar'); + assert.strictEqual(message.properties.messageId, messageId); + assert.strictEqual(message.fields.routingKey, 'foo'); + assert.strictEqual(message.properties.deliveryMode, 2); done(); }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should maintain original fields, properties and headers when republished', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', { options: { persistent: true, headers: { foo: 'bar' } } }, (err, publication) => { - assert.ifError(err); + it('should cap republishes when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); - publication.on('success', (messageId) => { - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 2) return ackOrNack(new Error('republish'), { strategy: 'republish' }); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].republished, 1); - assert.strictEqual(message.properties.headers.foo, 'bar'); - assert.strictEqual(message.properties.messageId, messageId); - assert.strictEqual(message.fields.routingKey, 'foo'); - assert.strictEqual(message.properties.deliveryMode, 2); - done(); - }); - }); + let count = 0; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + count++; + ackOrNack(new Error('republish'), { + strategy: 'republish', + attempts: 5, }); }); - }, - ); - }); + }); - it('should cap republishes when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + setTimeout(() => { + assert.strictEqual(count, 6); + done(); + }, 500); + }, + ); + }); + + it('should defer republishing messages when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); - let count = 0; + let numberOfMessages = 0; + const startTime = new Date().getTime(); broker.subscribe('s1', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { - count++; - ackOrNack(new Error('republish'), { - strategy: 'republish', - attempts: 5, - }); - }); - }); - - setTimeout(() => { - assert.strictEqual(count, 6); - done(); - }, 500); - }, - ); - }); - - it('should defer republishing messages when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - - let numberOfMessages = 0; - const startTime = new Date().getTime(); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - numberOfMessages++; - if (numberOfMessages < 10) return ackOrNack(new Error('republish'), { strategy: 'republish', defer: 100 }); - ackOrNack(); - const stopTime = new Date().getTime(); - assert.ok(stopTime - startTime >= 900, 'Republish was not deferred'); - done(); - }); + assert.ok(message); + numberOfMessages++; + if (numberOfMessages < 10) return ackOrNack(new Error('republish'), { strategy: 'republish', defer: 100 }); + ackOrNack(); + const stopTime = new Date().getTime(); + assert.ok(stopTime - startTime >= 900, 'Republish was not deferred'); + done(); }); }); - }, - ); - }); - - it('should immediately nack republished messages when requested', (test, done) => { - createBroker( - { - vhosts: { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, - e2: { - assert: true, - }, + }); + }, + ); + }); + + it('should immediately nack republished messages when requested', (test, done) => { + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, }, - queues: { - q1: { - assert: true, - options: { - arguments: { - 'x-dead-letter-exchange': 'e2', - }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', }, }, - q2: { - assert: true, - }, }, - bindings: { - b1: { - source: 'e1', - destination: 'q1', - }, - b2: { - source: 'e2', - destination: 'q2', - }, + q2: { + assert: true, }, }, - }, - publications: _.pick(publications, 'p1'), - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - }, - s2: { - vhost: '/', - queue: 'q2', + bindings: { + b1: { + source: 'e1', + destination: 'q1', + }, + b2: { + source: 'e2', + destination: 'q2', + }, }, }, }, - (err, broker) => { + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - let count = 0; - subscription.on('message', (message, content, ackOrNack) => { - assert.strictEqual(++count, 1); - assert.ok(message); - ackOrNack(new Error('immediate nack'), { - strategy: 'republish', - immediateNack: true, - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + let count = 0; + subscription.on('message', (message, content, ackOrNack) => { + assert.strictEqual(++count, 1); + assert.ok(message); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, }); }); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - done(); - }); + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + done(); }); }); - }, - ); - }); - - it('should immediately nack republished messages delivered from a queue whose name contains periods', (test, done) => { - createBroker( - { - vhosts: { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, - e2: { - assert: true, - }, + }); + }, + ); + }); + + it('should immediately nack republished messages delivered from a queue whose name contains periods', (test, done) => { + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, }, - queues: { - 'q_10.10.10.10': { - assert: true, - options: { - arguments: { - 'x-dead-letter-exchange': 'e2', - }, + }, + queues: { + 'q_10.10.10.10': { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', }, }, - q2: { - assert: true, - }, }, - bindings: { - b1: { - source: 'e1', - destination: 'q_10.10.10.10', - }, - b2: { - source: 'e2', - destination: 'q2', - }, + q2: { + assert: true, }, }, - }, - publications: _.pick(publications, 'p1'), - subscriptions: { - s1: { - vhost: '/', - queue: 'q_10.10.10.10', - }, - s2: { - vhost: '/', - queue: 'q2', + bindings: { + b1: { + source: 'e1', + destination: 'q_10.10.10.10', + }, + b2: { + source: 'e2', + destination: 'q2', + }, }, }, }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - let count = 0; - subscription.on('message', (message, content, ackOrNack) => { - assert.strictEqual(++count, 1); - assert.ok(message); - ackOrNack(new Error('immediate nack'), { - strategy: 'republish', - immediateNack: true, - }); - }); - }); - - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - done(); - }); - }); - }); - }, - ); - }); - - it('should forward messages to publication when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q_10.10.10.10', + }, + s2: { + vhost: '/', + queue: 'q2', + }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); broker.subscribe('s1', (err, subscription) => { assert.ifError(err); + let count = 0; subscription.on('message', (message, content, ackOrNack) => { + assert.strictEqual(++count, 1); assert.ok(message); - ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: 'p2' }); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, + }); }); }); broker.subscribe('s2', (err, subscription) => { assert.ifError(err); subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); - assert.strictEqual(message.properties.headers.CC.length, 1); - assert.strictEqual(message.properties.headers.CC[0], `${broker.qualify('/', 'q1')}.bar`); - assert.strictEqual(message.properties.headers.rascal.error.message, 'forward me'); - assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should nack the original message if forwarding fails', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); + it('should forward messages to publication when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: '/xx' }); - }); - subscription.on('error', (err) => { - assert.ok(/Message: .* was forwarded to publication: \/xx, but was returned/.test(err.message), err.message); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: 'p2' }); }); + }); - broker.subscribe('/dlq', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(content, 'test message'); - done(); - }); + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); + assert.strictEqual(message.properties.headers.CC.length, 1); + assert.strictEqual(message.properties.headers.CC[0], `${broker.qualify('/', 'q1')}.bar`); + assert.strictEqual(message.properties.headers.rascal.error.message, 'forward me'); + assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); + done(); }); - }, - ); - }); + }); + }, + ); + }); - it('should forward messages from a queue with period characters in the name', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { - assert.ifError(err); - broker.publish('p4', 'test message', assert.ifError); + it('should nack the original message if forwarding fails', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); - broker.subscribe('s5', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: 'p2' }); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: '/xx' }); + }); + subscription.on('error', (err) => { + assert.ok(/Message: .* was forwarded to publication: \/xx, but was returned/.test(err.message), err.message); }); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q_10.10.10.10')].forwarded, 1); - assert.strictEqual(message.properties.headers.CC.length, 1); - assert.strictEqual(message.properties.headers.CC[0], `${broker.qualify('/', 'q_10.10.10.10')}.bar`); - assert.strictEqual(message.properties.headers.rascal.error.message, 'forward me'); - assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); - done(); - }); + broker.subscribe('/dlq', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(content, 'test message'); + done(); }); - }, - ); - }); + }); + }, + ); + }); - it('should truncate error messages when forwarding', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should forward messages from a queue with period characters in the name', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p4', 'test message', assert.ifError); + + broker.subscribe('s5', (err, subscription) => { assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack({ message: 'forward me', code: 'red' }, { strategy: 'forward', publication: 'p2' }); + }); + }); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error(_.pad('x', 10000, 'x')), { - strategy: 'forward', - publication: 'p2', - }); - }); + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q_10.10.10.10')].forwarded, 1); + assert.strictEqual(message.properties.headers.CC.length, 1); + assert.strictEqual(message.properties.headers.CC[0], `${broker.qualify('/', 'q_10.10.10.10')}.bar`); + assert.strictEqual(message.properties.headers.rascal.error.message, 'forward me'); + assert.strictEqual(message.properties.headers.rascal.error.code, 'red'); + done(); }); + }); + }, + ); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); - assert.strictEqual(message.properties.headers.rascal.error.message.length, 1024); - done(); + it('should truncate error messages when forwarding', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); + + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error(_.pad('x', 10000, 'x')), { + strategy: 'forward', + publication: 'p2', }); }); - }, - ); - }); + }); - it('should override routing key when forwarding messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + broker.subscribe('s2', (err, subscription) => { assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); - - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('forward'), { - strategy: 'forward', - publication: 'p1', - options: { routingKey: 'baz' }, - }); - }); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); + assert.strictEqual(message.properties.headers.rascal.error.message.length, 1024); + done(); }); + }); + }, + ); + }); - broker.subscribe('s3', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); - done(); + it('should override routing key when forwarding messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); + + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('forward'), { + strategy: 'forward', + publication: 'p1', + options: { routingKey: 'baz' }, }); }); - }, - ); - }); + }); - it('should maintain original fields, properties and headers when forwarding messages', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + broker.subscribe('s3', (err, subscription) => { assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); + done(); + }); + }); + }, + ); + }); - let messageId; + it('should maintain original fields, properties and headers when forwarding messages', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); - broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }, (err, publication) => { - assert.ifError(err); - publication.on('success', (_messageId) => { - messageId = _messageId; - }); - }); + let messageId; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('forward'), { - strategy: 'forward', - publication: 'p2', - }); - }); + broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }, (err, publication) => { + assert.ifError(err); + publication.on('success', (_messageId) => { + messageId = _messageId; }); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); - assert.strictEqual(message.properties.headers.foo, 'bar'); - assert.strictEqual(message.properties.messageId, messageId); - assert.strictEqual(message.fields.routingKey, 'foo'); - done(); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('forward'), { + strategy: 'forward', + publication: 'p2', }); }); - }, - ); - }); + }); - it('should not maintain original routing headers when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + broker.subscribe('s2', (err, subscription) => { assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); + assert.strictEqual(message.properties.headers.foo, 'bar'); + assert.strictEqual(message.properties.messageId, messageId); + assert.strictEqual(message.fields.routingKey, 'foo'); + done(); + }); + }); + }, + ); + }); - let messageId; + it('should not maintain original routing headers when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); - broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }, (err, publication) => { - assert.ifError(err); - publication.on('success', (_messageId) => { - messageId = _messageId; - }); - }); + let messageId; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('forward'), { - strategy: 'forward', - publication: 'p2', - restoreRoutingHeaders: false, - }); - }); + broker.publish('p1', 'test message', { options: { headers: { foo: 'bar' } } }, (err, publication) => { + assert.ifError(err); + publication.on('success', (_messageId) => { + messageId = _messageId; }); + }); - broker.subscribe('s2', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); - assert.strictEqual(message.properties.headers.foo, 'bar'); - assert.strictEqual(message.properties.messageId, messageId); - assert.strictEqual(message.fields.routingKey, 'bar'); - done(); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('forward'), { + strategy: 'forward', + publication: 'p2', + restoreRoutingHeaders: false, }); }); - }, - ); - }); + }); - it('should cap forwards when requested', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + broker.subscribe('s2', (err, subscription) => { assert.ifError(err); - broker.publish('p1', 'test message', assert.ifError); - - let count = 0; - - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - count++; - ackOrNack(new Error('forward'), { - strategy: 'forward', - publication: 'p1', - attempts: 5, - }); - }); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.recovery[broker.qualify('/', 'q1')].forwarded, 1); + assert.strictEqual(message.properties.headers.foo, 'bar'); + assert.strictEqual(message.properties.messageId, messageId); + assert.strictEqual(message.fields.routingKey, 'bar'); + done(); }); + }); + }, + ); + }); - setTimeout(() => { - assert.strictEqual(count, 6); - done(); - }, 500); - }, - ); - }); + it('should cap forwards when requested', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', assert.ifError); - it('should error when forwarding messages to /dev/null', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + let count = 0; + + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.publish('p1', 'test message', (err, publication) => { - assert.ifError(err); - publication.on('success', (messageId) => { - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('forward'), { - strategy: 'forward', - publication: 'p3', - }); - }) - .on('error', (err) => { - assert.ok(err); - assert.strictEqual(`Message: ${messageId} was forwarded to publication: p3, but was returned`, err.message); - done(); - }); - }); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + count++; + ackOrNack(new Error('forward'), { + strategy: 'forward', + publication: 'p1', + attempts: 5, }); }); - }, - ); - }); + }); - it('should error on unknown recovery strategy', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + setTimeout(() => { + assert.strictEqual(count, 6); + done(); + }, 500); + }, + ); + }); + + it('should error when forwarding messages to /dev/null', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err, publication) => { assert.ifError(err); - broker.publish('p1', 'test message', (err, publication) => { - assert.ifError(err); - publication.on('success', (messageId) => { - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('unknown'), { strategy: 'foo' }); - }) - .on('error', (err) => { - assert.ok(err); - assert.strictEqual(`Error recovering message: ${messageId}. No such strategy: foo.`, err.message); - done(); + publication.on('success', (messageId) => { + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription + .on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('forward'), { + strategy: 'forward', + publication: 'p3', }); - }); + }) + .on('error', (err) => { + assert.ok(err); + assert.strictEqual(`Message: ${messageId} was forwarded to publication: p3, but was returned`, err.message); + done(); + }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should chain recovery strategies', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should error on unknown recovery strategy', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err, publication) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - - const messages = {}; + publication.on('success', (messageId) => { broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - ackOrNack(new Error('retry'), [{ strategy: 'republish', attempts: 5 }, { strategy: 'ack' }], () => { - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - if (messages[message.properties.messageId] < 6) return; - setTimeout(() => { - assert.strictEqual(messages[message.properties.messageId], 6); - done(); - }, 500); + subscription + .on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('unknown'), { strategy: 'foo' }); + }) + .on('error', (err) => { + assert.ok(err); + assert.strictEqual(`Error recovering message: ${messageId}. No such strategy: foo.`, err.message); + done(); }); - }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should not rollback message when shutting down broker after ack', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should chain recovery strategies', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + ackOrNack(new Error('retry'), [{ strategy: 'republish', attempts: 5 }, { strategy: 'ack' }], () => { messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - ackOrNack(() => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); - }); + if (messages[message.properties.messageId] < 6) return; + setTimeout(() => { + assert.strictEqual(messages[message.properties.messageId], 6); + done(); + }, 500); }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should not rollback message when shutting down broker after ack', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not rollback message when shutting down broker after ack', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - ackOrNack(new Error('Oh Noes'), [{ strategy: 'ack' }], () => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + ackOrNack(() => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should nack messages when all recovery strategies have been attempted', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should not rollback message when shutting down broker after ack', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - const messages = {}; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.ok(message); - messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; - ackOrNack(new Error('retry'), [], () => { - broker.shutdown((err) => { - assert.ifError(err); - amqputils.assertMessageAbsent('q1', namespace, done); - }); + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + ackOrNack(new Error('Oh Noes'), [{ strategy: 'ack' }], () => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should limit concurrent messages using consumer prefetch', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - prefetch: 5, - }, - }, - }, - (err, broker) => { + it('should nack messages when all recovery strategies have been attempted', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { - assert.ifError(err); - const messages = []; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert(message); - messages.push(ackOrNack); - if (messages.length === 5) { - setTimeout(() => { - assert.strictEqual(messages.length, 5); - subscription.cancel(done); - setTimeout(() => { - messages.forEach((ackOrNack) => ackOrNack()); - }, 1); - }, 500); - } + + const messages = {}; + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert.ok(message); + messages[message.properties.messageId] = messages[message.properties.messageId] ? messages[message.properties.messageId] + 1 : 1; + ackOrNack(new Error('retry'), [], () => { + broker.shutdown((err) => { + assert.ifError(err); + amqputils.assertMessageAbsent('q1', namespace, done); }); }); - }, - ); - }, - ); - }); + }); + }); + }); + }, + ); + }); - it('should limit concurrent messages using channel prefetch', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - channelPrefetch: 5, - }, + it('should limit concurrent messages using consumer prefetch', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + prefetch: 5, }, }, - (err, broker) => { - assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { + assert.ifError(err); + const messages = []; + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - const messages = []; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert(message); - messages.push(ackOrNack); - if (messages.length === 5) { + subscription.on('message', (message, content, ackOrNack) => { + assert(message); + messages.push(ackOrNack); + if (messages.length === 5) { + setTimeout(() => { + assert.strictEqual(messages.length, 5); + subscription.cancel(done); setTimeout(() => { - assert.strictEqual(messages.length, 5); - subscription.cancel(done); - setTimeout(() => { - messages.forEach((ackOrNack) => ackOrNack()); - }, 1); - }, 500); - } - }); + messages.forEach((ackOrNack) => ackOrNack()); + }, 1); + }, 500); + } }); - }, - ); - }, - ); - }); - - it('should limit concurrent messages using dynamic channel prefetch while subscribing', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - channelPrefetch: 1, - }, + }); + }, + ); + }, + ); + }); + + it('should limit concurrent messages using channel prefetch', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + channelPrefetch: 5, }, }, - (err, broker) => { - assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { + assert.ifError(err); + const messages = []; + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - const messages = []; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert(message); - messages.push(ackOrNack); - if (messages.length === 5) { + subscription.on('message', (message, content, ackOrNack) => { + assert(message); + messages.push(ackOrNack); + if (messages.length === 5) { + setTimeout(() => { + assert.strictEqual(messages.length, 5); + subscription.cancel(done); setTimeout(() => { - assert.strictEqual(messages.length, 5); - subscription.cancel(done); - setTimeout(() => { - messages.forEach((ackOrNack) => ackOrNack()); - }, 1); - }, 500); - } - subscription.setChannelPrefetch(5, (err) => { - assert.ifError(err); - }); - }); + messages.forEach((ackOrNack) => ackOrNack()); + }, 1); + }, 500); + } }); - }, - ); - }, - ); - }); - - it('should limit concurrent messages using dynamic channel prefetch before subscription starts', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - channelPrefetch: 1, - }, + }); + }, + ); + }, + ); + }); + + it('should limit concurrent messages using dynamic channel prefetch while subscribing', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + channelPrefetch: 1, }, }, - (err, broker) => { - assert.ifError(err); - async.times( - 10, - (index, next) => { - broker.publish('p1', 'test message', next); - }, - (err) => { + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { + assert.ifError(err); + const messages = []; + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - const messages = []; - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + assert(message); + messages.push(ackOrNack); + if (messages.length === 5) { + setTimeout(() => { + assert.strictEqual(messages.length, 5); + subscription.cancel(done); + setTimeout(() => { + messages.forEach((ackOrNack) => ackOrNack()); + }, 1); + }, 500); + } subscription.setChannelPrefetch(5, (err) => { assert.ifError(err); - - subscription.on('message', (message, content, ackOrNack) => { - assert(message); - messages.push(ackOrNack); - if (messages.length === 5) { - setTimeout(() => { - assert.strictEqual(messages.length, 5); - subscription.cancel(done); - setTimeout(() => { - messages.forEach((ackOrNack) => ackOrNack()); - }, 1); - }, 500); - } - }); }); }); - }, - ); - }, - ); - }); - - it('should emit channel errors', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, + }); + }, + ); + }, + ); + }); + + it('should limit concurrent messages using dynamic channel prefetch before subscription starts', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + channelPrefetch: 1, + }, }, - (err, broker) => { - assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + }, + (err, broker) => { + assert.ifError(err); + async.times( + 10, + (index, next) => { + broker.publish('p1', 'test message', next); + }, + (err) => { assert.ifError(err); - broker.subscribe('s1', { retry: false }, (err, subscription) => { + const messages = []; + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - ackOrNack(); - delete message.__rascal_acknowledged; - ackOrNack(); // trigger a channel error - }) - .on('error', (err) => { - assert.ok(err); - assert.strictEqual('Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"', err.message); - done(); + subscription.setChannelPrefetch(5, (err) => { + assert.ifError(err); + + subscription.on('message', (message, content, ackOrNack) => { + assert(message); + messages.push(ackOrNack); + if (messages.length === 5) { + setTimeout(() => { + assert.strictEqual(messages.length, 5); + subscription.cancel(done); + setTimeout(() => { + messages.forEach((ackOrNack) => ackOrNack()); + }, 1); + }, 500); + } }); + }); }); + }, + ); + }, + ); + }); + + it('should emit channel errors', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { + assert.ifError(err); + broker.subscribe('s1', { retry: false }, (err, subscription) => { + assert.ifError(err); + subscription + .on('message', (message, content, ackOrNack) => { + ackOrNack(); + delete message.__rascal_acknowledged; + ackOrNack(); // trigger a channel error + }) + .on('error', (err) => { + assert.ok(err); + assert.strictEqual('Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"', err.message); + done(); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should not consume messages after unsubscribing', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - options: { - noAck: true, - }, + it('should not consume messages after unsubscribing', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + options: { + noAck: true, }, }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', () => { - assert.ok(false, 'Should not receive messages after unsubscribing'); - }); + subscription.on('message', () => { + assert.ok(false, 'Should not receive messages after unsubscribing'); + }); - subscription.cancel((err) => { + subscription.cancel((err) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - setTimeout(done, 500); - }); + setTimeout(done, 500); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should tollerate repeated unsubscription', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - options: { - noAck: true, - }, + it('should tollerate repeated unsubscription', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + options: { + noAck: true, }, }, }, - (err, broker) => { - assert.ifError(err); + }, + (err, broker) => { + assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - async.timesSeries( - 3, - (index, cb) => { - subscription.cancel(cb); - }, - done, - ); - }); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + async.timesSeries( + 3, + (index, cb) => { + subscription.cancel(cb); + }, + done, + ); + }); + }, + ); + }); + + it('should not warn about emitter leaks', (test, done) => { + const config = { + vhosts, + publications, + subscriptions: {}, + }; + + _.times(11, (i) => { + config.subscriptions[`s${i}`] = { + vhost: '/', + queue: 'q1', + options: { + noAck: true, }, - ); + }; }); - it('should not warn about emitter leaks', (test, done) => { - const config = { - vhosts, - publications, - subscriptions: {}, - }; + createBroker(config, (err, broker) => { + assert.ifError(err); _.times(11, (i) => { - config.subscriptions[`s${i}`] = { - vhost: '/', - queue: 'q1', - options: { - noAck: true, - }, - }; - }); - - createBroker(config, (err, broker) => { - assert.ifError(err); - - _.times(11, (i) => { - broker.subscribe(`s${i}`, (err) => { - assert.ifError(err); - if (i === 10) done(); - }); + broker.subscribe(`s${i}`, (err) => { + assert.ifError(err); + if (i === 10) done(); }); }); }); + }); - it('should attach the subscription vhost to message properties', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions, - }, - (err, broker) => { + it('should attach the subscription vhost to message properties', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - ackOrNack(); - assert.strictEqual(message.properties.headers.rascal.originalVhost, '/'); - done(); - }); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.headers.rascal.originalVhost, '/'); + done(); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should emit an error if trying to ack a message after unsubscribing', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: _.defaultsDeep({}, subscriptions, { - s1: { closeTimeout: 100 }, - }), - }, - (err, broker) => { + it('should emit an error if trying to ack a message after unsubscribing', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: _.defaultsDeep({}, subscriptions, { + s1: { closeTimeout: 100 }, + }), + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - subscription.cancel((err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); - setTimeout(ackOrNack, 200); - }); - }) - .on('error', (err) => { - assert.strictEqual(err.message, 'The channel has been closed. Unable to ack message'); - done(); + subscription + .on('message', (message, content, ackOrNack) => { + subscription.cancel((err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + setTimeout(ackOrNack, 200); }); - }); + }) + .on('error', (err) => { + assert.strictEqual(err.message, 'The channel has been closed. Unable to ack message'); + done(); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should emit an error if trying to nack a message after unsubscribing', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: _.defaultsDeep({}, subscriptions, { - s1: { closeTimeout: 100 }, - }), - }, - (err, broker) => { + it('should emit an error if trying to nack a message after unsubscribing', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: _.defaultsDeep({}, subscriptions, { + s1: { closeTimeout: 100 }, + }), + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', (message, content, ackOrNack) => { - subscription.cancel((err) => { - assert.strictEqual(err.code, 'ETIMEDOUT'); - setTimeout(() => ackOrNack(new Error('Oh Noes!')), 200); - }); - }) - .on('error', (err) => { - assert.strictEqual(err.message, 'The channel has been closed. Unable to nack message'); - done(); + subscription + .on('message', (message, content, ackOrNack) => { + subscription.cancel((err) => { + assert.strictEqual(err.code, 'ETIMEDOUT'); + setTimeout(() => ackOrNack(new Error('Oh Noes!')), 200); }); - }); + }) + .on('error', (err) => { + assert.strictEqual(err.message, 'The channel has been closed. Unable to nack message'); + done(); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should symetrically decrypt messages', (test, done) => { - createBroker( - { - vhosts, - publications: { - p1: { - queue: 'q1', - encryption: { - name: 'well-known', + it('should symmetrically decrypt messages', (test, done) => { + createBroker( + { + vhosts, + publications: { + p1: { + queue: 'q1', + encryption: { + name: 'well-known', + key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', + ivLength: 16, + algorithm: 'aes-256-cbc', + }, + }, + }, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + encryption: { + 'well-known': { key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', ivLength: 16, algorithm: 'aes-256-cbc', }, }, }, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - encryption: { - 'well-known': { - key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', - ivLength: 16, - algorithm: 'aes-256-cbc', - }, - }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + assert.strictEqual(message.properties.contentType, 'application/octet-stream'); + assert.strictEqual(content, 'test message'); + done(); + }); + }); + }); + }, + ); + }); + + it('should report invalid_content when missing encryption profile', (test, done) => { + createBroker( + { + vhosts, + publications: { + p1: { + queue: 'q1', + encryption: { + name: 'not-well-known', + key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', + ivLength: 16, + algorithm: 'aes-256-cbc', }, }, }, - (err, broker) => { + subscriptions: { + s1: { + queue: 'q1', + encryption: {}, + }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_content', (err, message, ackOrNack) => { ackOrNack(); - assert.strictEqual(message.properties.contentType, 'application/octet-stream'); - assert.strictEqual(content, 'test message'); + assert.strictEqual(err.message, 'Unknown encryption profile: not-well-known'); done(); }); - }); }); - }, - ); - }); + }); + }, + ); + }); - it('should report invalid_content when missing encryption profile', (test, done) => { - createBroker( - { - vhosts, - publications: { - p1: { - queue: 'q1', - encryption: { - name: 'not-well-known', - key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', + it('should fail with invalid content when encryption errors', (test, done) => { + createBroker( + { + vhosts, + publications: { + p1: { + queue: 'q1', + encryption: { + name: 'well-known', + key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', + ivLength: 16, + algorithm: 'aes-256-cbc', + }, + }, + }, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + encryption: { + 'well-known': { + key: 'aa', ivLength: 16, algorithm: 'aes-256-cbc', }, }, }, - subscriptions: { - s1: { - queue: 'q1', - encryption: {}, - }, - }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_content', (err, message, ackOrNack) => { - ackOrNack(); - assert.strictEqual(err.message, 'Unknown encryption profile: not-well-known'); - done(); - }); - }); + subscription + .on('message', () => { + assert.ok(false, 'Message should not have been delivered'); + }) + .on('invalid_content', (err, message, ackOrNack) => { + ackOrNack(); + assert.strictEqual(err.message, 'Invalid key length'); + done(); + }); }); - }, - ); - }); + }); + }, + ); + }); - it('should fail with invalid content when encryption errors', (test, done) => { - createBroker( - { - vhosts, - publications: { - p1: { - queue: 'q1', - encryption: { - name: 'well-known', - key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', - ivLength: 16, - algorithm: 'aes-256-cbc', - }, - }, - }, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - encryption: { - 'well-known': { - key: 'aa', - ivLength: 16, - algorithm: 'aes-256-cbc', - }, - }, - }, + it('should emit cancelled event when the broker cancels the consumer', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.publish('p1', 'test message', (err) => { - assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { + subscription.on('message', () => { + assert.ok(false, 'No messages expected'); + }); + subscription.on('cancelled', (err) => { + assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); + subscription.cancel(done); + }); + subscription.on('subscribed', () => { + amqputils.deleteQueue('q1', namespace, (err) => { assert.ifError(err); - subscription - .on('message', () => { - assert.ok(false, 'Message should not have been delivered'); - }) - .on('invalid_content', (err, message, ackOrNack) => { - ackOrNack(); - assert.strictEqual(err.message, 'Invalid key length'); - done(); - }); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should emit cancelled event when the broker cancels the consumer', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - }, + it('should emit cancelled event when the broker cancels the consumer', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', () => { - assert.ok(false, 'No messages expected'); - }); - subscription.on('cancelled', (err) => { - assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); - subscription.cancel(done); - }); - subscription.on('subscribed', () => { - amqputils.deleteQueue('q1', namespace, (err) => { - assert.ifError(err); - }); + subscription.on('message', () => { + assert.ok(false, 'No messages expected'); + }); + subscription.on('cancel', (err) => { + assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); + subscription.cancel(done); + }); + subscription.on('subscribed', () => { + amqputils.deleteQueue('q1', namespace, (err) => { + assert.ifError(err); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should emit an error event when the broker cancels the consumer and there is no cancelled handler', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - }, + it('should emit an error event when the broker cancels the consumer and there is no cancel handler', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', () => { - assert.ok(false, 'No messages expected'); - }); - subscription.on('error', (err) => { - assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); - subscription.cancel(done); - }); - subscription.on('subscribed', () => { - amqputils.deleteQueue('q1', namespace, (err) => { - assert.ifError(err); - }); + subscription.on('message', () => { + assert.ok(false, 'No messages expected'); + }); + subscription.on('error', (err) => { + assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); + subscription.cancel(done); + }); + subscription.on('subscribed', () => { + amqputils.deleteQueue('q1', namespace, (err) => { + assert.ifError(err); }); }); - }, - ); - }); + }); + }, + ); + }); - it('should resubscribe following a broker cancellation', (test, done) => { - createBroker( - { - vhosts, - publications, - subscriptions: { - s1: { - vhost: '/', - queue: 'q1', - }, + it('should resubscribe following a broker cancellation', (test, done) => { + createBroker( + { + vhosts, + publications, + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', }, }, - (err, broker) => { + }, + (err, broker) => { + assert.ifError(err); + broker.subscribe('s1', (err, subscription) => { assert.ifError(err); - broker.subscribe('s1', (err, subscription) => { - assert.ifError(err); - subscription.on('message', (message, content, ackOrNack) => { - assert.strictEqual(content.toString(), 'ok'); - ackOrNack(); - subscription.cancel(done); - }); - subscription.on('error', (err) => { - assert.ok(/Operation failed: BasicConsume; 404 \(NOT-FOUND\)/.test(err.message), err.message); - }); - subscription.on('cancelled', (err) => { - assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); - amqputils.createQueue('q1', namespace, (err) => { - assert.ifError(err); - amqputils.publishMessageToQueue('q1', namespace, 'ok', {}); - }); + subscription.on('message', (message, content, ackOrNack) => { + assert.strictEqual(content.toString(), 'ok'); + ackOrNack(); + subscription.cancel(done); + }); + subscription.on('error', (err) => { + assert.ok(/Operation failed: BasicConsume; 404 \(NOT-FOUND\)/.test(err.message), err.message); + }); + subscription.on('cancelled', (err) => { + assert.strictEqual(err.message, 'Subscription: s1 was cancelled by the broker'); + amqputils.createQueue('q1', namespace, (err) => { + assert.ifError(err); + amqputils.publishMessageToQueue('q1', namespace, 'ok', {}); }); - subscription.on('subscribed', () => { - amqputils.deleteQueue('q1', namespace, (err) => { - assert.ifError(err); - }); + }); + subscription.on('subscribed', () => { + amqputils.deleteQueue('q1', namespace, (err) => { + assert.ifError(err); }); }); - }, - ); - }); - - function createBroker(config, next) { - config = _.defaultsDeep(config, testConfig); - Broker.create(config, (err, _broker) => { - broker = _broker; - next(err, broker); - }); - } - }, - { timeout: 5000 }, -); + }); + }, + ); + }); + + function createBroker(config, next) { + config = _.defaultsDeep(config, testConfig); + Broker.create(config, (err, _broker) => { + broker = _broker; + next(err, broker); + }); + } +}); diff --git a/test/utils/amqputils.js b/test/utils/amqputils.js index aad4d2d..92df663 100644 --- a/test/utils/amqputils.js +++ b/test/utils/amqputils.js @@ -1,5 +1,7 @@ const assert = require('assert'); const _ = require('lodash'); +const async = require('async'); +const superagent = require('superagent'); module.exports = { init, @@ -91,6 +93,54 @@ function init(connection) { }); } + function waitForConnections(next) { + let connections = []; + let attempts = 0; + async.whilst( + (cb) => { + cb(null, attempts < 100 && connections.length === 0); + }, + (cb) => { + setTimeout(() => { + attempts++; + fetchConnections((err, _connections) => { + if (err) return cb(err); + connections = _connections; + cb(null, connections); + }); + }, 100); + }, + next, + ); + } + + function fetchConnections(next) { + superagent + .get('http://localhost:15672/api/connections') + .auth('guest', 'guest') + .end((err, response) => { + if (err) return next(err); + next(null, response.body); + }); + } + + function closeConnections(connections, reason, next) { + async.each(connections, (connection, cb) => { + closeConnection(connection.name, reason, cb); + }, next); + } + + function closeConnection(name, reason, next) { + superagent + .delete(`http://localhost:15672/api/connections/${name}`) + .auth('guest', 'guest') + .set('x-reason', reason) + .end((err) => { + if (err) return next(err); + next(); + }); + } + return { disconnect, checkExchange: _.curry(checkExchange), @@ -106,5 +156,7 @@ function init(connection) { assertExchangeAbsent: checkExchange.bind(null, false), assertQueuePresent: checkQueue.bind(null, true), assertQueueAbsent: checkQueue.bind(null, false), + closeConnections, + waitForConnections, }; } diff --git a/test/vhost.tests.js b/test/vhost.tests.js index 711bf33..bb37b3d 100644 --- a/test/vhost.tests.js +++ b/test/vhost.tests.js @@ -8,265 +8,280 @@ const testConfig = require('../lib/config/tests'); const Broker = require('..').Broker; const AmqpUtils = require('./utils/amqputils'); -describe( - 'Vhost', - () => { - let broker; - let amqputils; +describe('Vhost', () => { + let broker; + let amqputils; - beforeEach((test, done) => { - amqplib.connect((err, connection) => { - if (err) return done(err); - amqputils = AmqpUtils.init(connection); - done(); - }); + beforeEach((test, done) => { + amqplib.connect((err, connection) => { + if (err) return done(err); + amqputils = AmqpUtils.init(connection); + done(); }); + }); - afterEach((test, done) => { - amqputils.disconnect(() => { - if (broker) return broker.nuke(done); - done(); - }); + afterEach((test, done) => { + amqputils.disconnect(() => { + if (broker) return broker.nuke(done); + done(); }); + }); - it('should timeout connections', (test, done) => { - const namespace = uuid(); - createBroker( - { - vhosts: { - '/': { - connection: { - hostname: '10.255.255.1', - socketOptions: { - timeout: 100, - }, + it('should timeout connections', (test, done) => { + const namespace = uuid(); + createBroker( + { + vhosts: { + '/': { + connection: { + hostname: '10.255.255.1', + socketOptions: { + timeout: 100, }, - namespace, }, + namespace, }, }, - (err) => { - assert.ok(err.message.match('connect ETIMEDOUT')); - done(); - }, - ); - }); + }, + (err) => { + assert.ok(err.message.match('connect ETIMEDOUT')); + done(); + }, + ); + }); - it('should create exchanges', (test, done) => { - const namespace = uuid(); - createBroker( - { - vhosts: { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, + it('should create exchanges', (test, done) => { + const namespace = uuid(); + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, }, }, }, }, - (err) => { - assert.ifError(err); - amqputils.assertExchangePresent('e1', namespace, done); - }, - ); - }); - - it( - 'should create objects concurrently', - (test, done) => { - // This test is too slow for CI - if (process.env.CI) return done(); - - function createAllTheThings(concurrency, cb) { - const namespace = uuid(); - const exchanges = new Array(100) - .fill() - .map((_, index) => `e${index + 1}`) - .reduce( - (acc, name) => Object.assign(acc, { - [name]: { - assert: true, - }, - }), - {}, - ); - - const queues = new Array(100) - .fill() - .map((_, index) => `q${index + 1}`) - .reduce( - (acc, name) => Object.assign(acc, { - [name]: { - assert: true, - }, - }), - {}, - ); + }, + (err) => { + assert.ifError(err); + amqputils.assertExchangePresent('e1', namespace, done); + }, + ); + }); - const bindings = new Array(100).fill().map((_, index) => `e${index + 1}[a.b.c] -> q${index + 1}`); + it('should create objects concurrently', (test, done) => { + // This test is too slow for CI + if (process.env.CI) return done(); - const before = Date.now(); - createBroker( - { - vhosts: { - '/': { - concurrency, - namespace, - exchanges, - queues, - bindings, - }, - }, + function createAllTheThings(concurrency, cb) { + const namespace = uuid(); + const exchanges = new Array(100) + .fill() + .map((_, index) => `e${index + 1}`) + .reduce( + (acc, name) => Object.assign(acc, { + [name]: { + assert: true, }, - (err) => { - assert.ifError(err); - const after = Date.now(); - amqputils.assertExchangePresent('e100', namespace, (err) => { - if (err) return cb(err); - broker.nuke((err) => { - cb(err, after - before); - }); - }); + }), + {}, + ); + + const queues = new Array(100) + .fill() + .map((_, index) => `q${index + 1}`) + .reduce( + (acc, name) => Object.assign(acc, { + [name]: { + assert: true, }, - ); - } + }), + {}, + ); - const reps = 5; - const serialTest = (n, cb) => createAllTheThings(1, cb); - const concurrentTest = (n, cb) => createAllTheThings(10, cb); - async.series([(cb) => async.timesSeries(reps, serialTest, cb), (cb) => async.timesSeries(reps, concurrentTest, cb)], (err, results) => { - if (err) return done(err); - const [a, b] = results; - const averageA = a.reduce((a, b) => a + b, 0) / reps; - const averageB = b.reduce((a, b) => a + b, 0) / reps; - assert.ok(averageB < averageA / 2); - return done(); - }); - }, - { timeout: 60000 }, - ); + const bindings = new Array(100).fill().map((_, index) => `e${index + 1}[a.b.c] -> q${index + 1}`); - it('should create queues', (test, done) => { - const namespace = uuid(); + const before = Date.now(); createBroker( { vhosts: { '/': { + concurrency, namespace, - queues: { - q1: { - assert: true, - }, - }, + exchanges, + queues, + bindings, }, }, }, (err) => { assert.ifError(err); - amqputils.assertQueuePresent('q1', namespace, done); + const after = Date.now(); + amqputils.assertExchangePresent('e100', namespace, (err) => { + if (err) return cb(err); + broker.nuke((err) => { + cb(err, after - before); + }); + }); }, ); + } + + const reps = 5; + const serialTest = (n, cb) => createAllTheThings(1, cb); + const concurrentTest = (n, cb) => createAllTheThings(10, cb); + async.series([(cb) => async.timesSeries(reps, serialTest, cb), (cb) => async.timesSeries(reps, concurrentTest, cb)], (err, results) => { + if (err) return done(err); + const [a, b] = results; + const averageA = a.reduce((a, b) => a + b, 0) / reps; + const averageB = b.reduce((a, b) => a + b, 0) / reps; + assert.ok(averageB < averageA / 2); + return done(); }); + }, { timeout: 60000 }); - it('should fail when checking a missing exchange', (test, done) => { - createBroker( - { - vhosts: { - '/': { - exchanges: { - e1: { - assert: false, - check: true, - }, + it('should create queues', (test, done) => { + const namespace = uuid(); + createBroker( + { + vhosts: { + '/': { + namespace, + queues: { + q1: { + assert: true, }, }, }, }, - (err) => { - assert.ok(err); - assert.ok(/NOT-FOUND/.test(err.message), format('%s did not match the expected format', err.message)); - done(); - }, - ); - }); + }, + (err) => { + assert.ifError(err); + amqputils.assertQueuePresent('q1', namespace, done); + }, + ); + }); - it('should fail when checking a missing queue', (test, done) => { - createBroker( - { - vhosts: { - '/': { - queues: { - q1: { - assert: false, - check: true, - }, + it('should fail when checking a missing exchange', (test, done) => { + createBroker( + { + vhosts: { + '/': { + exchanges: { + e1: { + assert: false, + check: true, }, }, }, }, - (err) => { - assert.ok(err); - assert.ok(/NOT-FOUND/.test(err.message), format('%s did not match the expected format', err.message)); - done(); + }, + (err) => { + assert.ok(err); + assert.ok(/NOT-FOUND/.test(err.message), format('%s did not match the expected format', err.message)); + done(); + }, + ); + }); + + it('should fail when checking a missing queue', (test, done) => { + createBroker( + { + vhosts: { + '/': { + queues: { + q1: { + assert: false, + check: true, + }, + }, + }, }, - ); - }); + }, + (err) => { + assert.ok(err); + assert.ok(/NOT-FOUND/.test(err.message), format('%s did not match the expected format', err.message)); + done(); + }, + ); + }); - it('should create bindings', (test, done) => { - const namespace = uuid(); + it('should create bindings', (test, done) => { + const namespace = uuid(); - createBroker( - { - vhosts: { - '/': { - namespace, - exchanges: { - e1: { - assert: true, - }, - e2: { - assert: true, - }, + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, }, - queues: { - q1: { - assert: true, - }, + e2: { + assert: true, }, - bindings: { - b1: { - source: 'e1', - destination: 'e2', - destinationType: 'exchange', - }, - b2: { - source: 'e1', - destination: 'q1', - }, + }, + queues: { + q1: { + assert: true, + }, + }, + bindings: { + b1: { + source: 'e1', + destination: 'e2', + destinationType: 'exchange', + }, + b2: { + source: 'e1', + destination: 'q1', }, }, }, }, - (err) => { + }, + (err) => { + assert.ifError(err); + amqputils.publishMessage('e1', namespace, 'test message', {}, (err) => { assert.ifError(err); - amqputils.publishMessage('e1', namespace, 'test message', {}, (err) => { - assert.ifError(err); - amqputils.assertMessage('q1', namespace, 'test message', done); - }); + amqputils.assertMessage('q1', namespace, 'test message', done); + }); + }, + ); + }); + + it('should reconnect on error', (test, done) => { + createBroker({ + vhosts: { + '/': { + queues: ['q'], }, - ); - }); + }, + }, (err, broker) => { + assert.ifError(err); - function createBroker(config, next) { - config = _.defaultsDeep(config, testConfig); - Broker.create(config, (err, _broker) => { - broker = _broker; - next(err, broker); + broker.on('error', (err) => { + assert.equal(err.message, 'Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - VHOST TEST"'); + broker.on('connect', () => done()); }); - } - }, - { timeout: 2000 }, -); + + amqputils.waitForConnections((err, connections) => { + amqputils.closeConnections(connections, 'VHOST TEST', (err) => { + assert.ifError(err); + }); + }); + }); + }, { timeout: 10000 }); + + function createBroker(config, next) { + config = _.defaultsDeep(config, testConfig); + Broker.create(config, (err, _broker) => { + broker = _broker; + next(err, broker); + }); + } +}, { timeout: 2000 });