Skip to content

Commit

Permalink
Work related to ##232
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Apr 12, 2024
1 parent 0a58d1c commit 7e62644
Show file tree
Hide file tree
Showing 8 changed files with 2,672 additions and 2,512 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## 19.0.0
- I am not aware of any breaking changes in this release, but emitting error events asynchronously could have subtle side effects, hence the major release
- Deprecate session 'cancelled' event in favour of 'cancel' (both will work)
- Refactor reconnection and resubscription code
- Emit errors asynchronously to prevent them being caught by the amqplib main accept loop
- Fix bug which throw an exception in the error handler when a close event was emitted with no error argument

## 18.0.1

- Removed console.log when the channel pool destroyed a channel
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ The reason Rascal nacks the message is because the alternatives are to leave the

## Very Important Section About Event Handling

[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. There are four places where you should do this
[amqplib](https://www.npmjs.com/package/amqplib) emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering.

There are four places where you need to register error handlers.

1. Immediately after obtaining a broker instance

Expand Down Expand Up @@ -1358,7 +1360,7 @@ If the message has not been auto-acknowledged you should ackOrNack it. **If you
The RabbitMQ broker may [cancel](https://www.rabbitmq.com/consumer-cancel.html) the consumer if the queue is deleted or the node on which the queue is located fails. [amqplib](https://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) handles this by delivering a `null` message. When Rascal receives the null message it will
1. Emit a `cancelled` event from the subscription.
1. Emit a `cancel` event from the subscription.
1. Emit an `error` event from the subscription if the `cancel` event was not handled
1. Optionally attempt to resubscribe as per normal retry configuration. If the queue was deleted rather than being failed over, the queue will not automatically be re-created and retry attempts will fail indefinitely.
Expand Down
56 changes: 37 additions & 19 deletions lib/amqp/Publication.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,27 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
session._removePausedListener();
if (err) return session.emit('error', err, messageId);
if (session.isAborted()) return abortPublish(channel, messageId);
const errorHandler = _.once(handleChannelError.bind(null, channel, messageId, session, config));

const disconnectionHandler = makeDisconnectionHandler(channel, messageId, session, config);
const returnHandler = session.emit.bind(session, 'return');
addListeners(channel, errorHandler, returnHandler);
addListeners(channel, disconnectionHandler, returnHandler);

try {
session._startPublish();

publishFn(channel, buffer, publishConfig, (err, ok) => {
session._endPublish();
if (err) {
destroyChannel(channel, errorHandler, returnHandler);
destroyChannel(channel, disconnectionHandler, returnHandler);
return session.emit('error', err, messageId);
}

ok ? returnChannel(channel, errorHandler, returnHandler) : deferReturnChannel(channel, errorHandler, returnHandler);
ok ? returnChannel(channel, disconnectionHandler, returnHandler) : deferReturnChannel(channel, disconnectionHandler, returnHandler);

session.emit('success', messageId);
});
} catch (err) {
returnChannel(channel, errorHandler, returnHandler);
returnChannel(channel, disconnectionHandler, returnHandler);
return session.emit('error', err, messageId);
}
});
Expand All @@ -125,19 +128,19 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
returnChannelFn(channel);
}

function returnChannel(channel, errorHandler, returnHandler) {
removeListeners(channel, errorHandler, returnHandler);
function returnChannel(channel, disconnectionHandler, returnHandler) {
removeListeners(channel, disconnectionHandler, returnHandler);
returnChannelFn(channel);
}

function deferReturnChannel(channel, errorHandler, returnHandler) {
function deferReturnChannel(channel, disconnectionHandler, returnHandler) {
channel.once('drain', () => {
returnChannel(channel, errorHandler, returnHandler);
returnChannel(channel, disconnectionHandler, returnHandler);
});
}

function destroyChannel(channel, errorHandler, returnHandler) {
removeListeners(channel, errorHandler, returnHandler);
function destroyChannel(channel, disconnectionHandler, returnHandler) {
removeListeners(channel, disconnectionHandler, returnHandler);
destroyChannelFn(channel);
}

Expand All @@ -163,19 +166,29 @@ function Publication(vhost, borrowChannelFn, returnChannelFn, destroyChannelFn,
}
}

function addListeners(channel, errorHandler, returnHandler) {
channel.on('error', errorHandler);
function makeDisconnectionHandler(channel, messageId, session, config) {
return _.once((err) => {
// Use setImmediate to avoid amqplib accept loop swallowing errors
setImmediate(() => (err
// Treat close events with errors as error events
? handleChannelError(channel, messageId, session, config, err)
: handleChannelClose(channel, messageId, session, config)));
});
}

function addListeners(channel, disconnectionHandler, returnHandler) {
channel.on('error', disconnectionHandler);
channel.on('return', returnHandler);
channel.connection.once('error', errorHandler);
channel.connection.once('close', errorHandler);
channel.connection.once('error', disconnectionHandler);
channel.connection.once('close', disconnectionHandler);
}

function removeListeners(channel, errorHandler, returnHandler) {
function removeListeners(channel, disconnectionHandler, returnHandler) {
channel.removeAllListeners('drain');
channel.removeListener('error', errorHandler);
channel.removeListener('error', disconnectionHandler);
channel.removeListener('return', returnHandler);
channel.connection.removeListener('error', errorHandler);
channel.connection.removeListener('close', errorHandler);
channel.connection.removeListener('error', disconnectionHandler);
channel.connection.removeListener('close', disconnectionHandler);
}

function publishToExchange(channel, content, config, next) {
Expand Down Expand Up @@ -252,3 +265,8 @@ function handleChannelError(borked, messageId, emitter, config, err) {
debug('Channel error: %s during publication of message: %s to %s using channel: %s', err.message, messageId, config.name, borked._rascal_id);
emitter.emit('error', err, messageId);
}

function handleChannelClose(borked, messageId, emitter, config) {
debug('Channel closed during publication of message: %s to %s using channel: %s', messageId, config.name, borked._rascal_id);
emitter.emit('close', messageId);
}
85 changes: 50 additions & 35 deletions lib/amqp/Subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
_configureQos(config, channel, (err) => {
if (err) return done(err);

const removeErrorHandlers = attachErrorHandlers(channel, session, config);
const onMessage = _onMessage.bind(null, session, config, removeErrorHandlers);
const removeDisconnectionHandlers = attachDisconnectionHandlers(channel, session, config);
const onMessage = _onMessage.bind(null, session, config, removeDisconnectionHandlers);

channel.consume(config.source, onMessage, config.options, (err, response) => {
if (err) {
debug('Error subscribing to %s using channel: %s. %s', config.source, channel._rascal_id, err.message);
removeErrorHandlers();
removeDisconnectionHandlers();
return done(err);
}
session._open(channel, response.consumerTag, (err) => {
Expand All @@ -88,8 +88,8 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
async.series(qos, next);
}

function _onMessage(session, config, removeErrorHandlers, message) {
if (!message) return handleConsumerCancel(session, config, removeErrorHandlers);
function _onMessage(session, config, removeDisconnectionHandlers, message) {
if (!message) return handleConsumerCancel(session, config, removeDisconnectionHandlers);

debug('Received message: %s from queue: %s', message.properties.messageId, config.queue);
session._incrementUnacknowledgeMessageCount(message.fields.consumerTag);
Expand Down Expand Up @@ -249,51 +249,66 @@ function Subscription(broker, vhost, subscriptionConfig, counter) {
if (err) session.emit('error', err);
}

function attachErrorHandlers(channel, session, config) {
function attachDisconnectionHandlers(channel, session, config) {
/* eslint-disable no-use-before-define */
const connection = channel.connection;
const removeErrorHandlers = _.once(() => {
channel.removeListener('error', errorHandler);
const removeDisconnectionHandlers = _.once(() => {
channel.removeListener('error', disconnectionHandler);
channel.on('error', (err) => {
debug('Suppressing error on cancelled session: %s to prevent connection errors. %s', channel._rascal_id, err.message);
});
connection.removeListener('error', errorHandler);
connection.removeListener('close', errorHandler);
connection.removeListener('error', disconnectionHandler);
connection.removeListener('close', disconnectionHandler);
});

const disconnectionHandler = makeDisconnectionHandler(session, config, removeDisconnectionHandlers);
channel.on('error', disconnectionHandler);
connection.once('error', disconnectionHandler);
connection.once('close', disconnectionHandler);
return removeDisconnectionHandlers;
}

function makeDisconnectionHandler(session, config, removeDisconnectionHandlers) {
return _.once((err) => {
// Use setImmediate to avoid amqplib accept loop swallowing errors
setImmediate(() => (err
// Treat close events with errors as error events
? handleChannelError(session, config, removeDisconnectionHandlers, 0, err)
: handleChannelClose(session, config, removeDisconnectionHandlers, 0)));
});
const errorHandler = _.once(handleChannelError.bind(null, session, config, removeErrorHandlers, 0));
channel.on('error', errorHandler);
connection.once('error', errorHandler);
connection.once('close', errorHandler);
return removeErrorHandlers;
}

function handleChannelError(session, config, removeErrorHandlers, attempts, err) {
function handleChannelError(session, config, removeDisconnectionHandler, attempt, err) {
debug('Handling channel error: %s from %s using channel: %s', err.message, config.name, session._getRascalChannelId());
if (removeErrorHandlers) removeErrorHandlers();
if (removeDisconnectionHandler) removeDisconnectionHandler();
session.emit('error', err);
config.retry
&& subscribeNow(session, config, (err) => {
if (!err) return;
const delay = timer.next();
debug('Will attempt resubscription(%d) to %s in %dms', attempts + 1, config.name, delay);
session._schedule(handleChannelError.bind(null, session, config, null, attempts + 1, err), delay);
});
retrySubscription(session, config, attempt + 1);
}

function handleConsumerCancel(session, config, removeErrorHandlers) {
function handleChannelClose(session, config, removeDisconnectionHandler, attempt) {
debug('Handling channel close from %s using channel: %s', config.name, session._getRascalChannelId());
removeDisconnectionHandler();
session.emit('close');
retrySubscription(session, config, attempt + 1);
}

function handleConsumerCancel(session, config, removeDisconnectionHandler) {
debug('Received consumer cancel from %s using channel: %s', config.name, session._getRascalChannelId());
removeErrorHandlers();
removeDisconnectionHandler();
const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name));
session.emit('cancelled', cancelErr) || session.emit('cancel', cancelErr) || session.emit('error', cancelErr);
session._close((err) => {
if (err) debug('Error cancelling subscription: %s', err.message);
const cancelErr = new Error(format('Subscription: %s was cancelled by the broker', config.name));
session.emit('cancelled', cancelErr) || session.emit('error', cancelErr);
config.retry
&& subscribeNow(session, config, (err) => {
if (!err) return;
const delay = timer.next();
debug('Will attempt resubscription(%d) to %s in %dms', 1, config.name, delay);
session._schedule(handleChannelError.bind(null, session, config, null, 1, err), delay);
});
retrySubscription(session, config, 1);
});
}

function retrySubscription(session, config, attempt) {
config.retry && subscribeNow(session, config, (err) => {
if (!err) return;
const delay = timer.next();
debug('Will attempt resubscription(%d) to %s in %dms', attempt, config.name, delay);
session._schedule(handleChannelError.bind(null, session, config, null, attempt, err), delay);
});
}
}
46 changes: 34 additions & 12 deletions lib/amqp/Vhost.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function Vhost(vhostConfig, components) {
connectionConfig = ctx.connectionConfig;
timer = backoff(ctx.connectionConfig.retry);

attachErrorHandlers(config);
attachDisconnectionHandlers(config);
forwardRabbitMQConnectionEvents();
ensureChannelPools();
resumeChannelAllocation();
Expand Down Expand Up @@ -471,11 +471,21 @@ function Vhost(vhostConfig, components) {
);
}

function attachErrorHandlers(config) {
function attachDisconnectionHandlers(config) {
connection.removeAllListeners('error');
const errorHandler = _.once(handleConnectionError.bind(null, connection, config));
connection.on('error', errorHandler);
connection.on('close', errorHandler);
const disconectionHandler = makeDisconnectionHandler(config);
connection.on('error', disconectionHandler);
connection.on('close', disconectionHandler);
}

function makeDisconnectionHandler(config) {
return _.once((err) => {
// Use setImmediate to avoid amqplib accept loop swallowing errors
setImmediate(() => (err
// Treat close events with errors as error events
? handleConnectionError(connection, config, err)
: handleConnectionClose(connection, config)));
});
}

function handleConnectionError(borked, config, err) {
Expand All @@ -484,12 +494,24 @@ function Vhost(vhostConfig, components) {
connection = undefined;
self.emit('disconnect');
self.emit('error', err, self.getConnectionDetails());
connectionConfig.retry
&& self.init((err) => {
if (!err) return;
const delay = timer.next();
debug('Will attempt reconnection in in %dms', delay);
reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay);
});
retryConnection(borked, config);
}

function handleConnectionClose(borked, config) {
debug('Handling connection close initially from connection: %s, %s', borked._rascal_id, connectionConfig.loggableUrl);
pauseChannelAllocation();
connection = undefined;
self.emit('disconnect');
self.emit('close', self.getConnectionDetails());
retryConnection(borked, config);
}

function retryConnection(borked, config) {
connectionConfig.retry && self.init((err) => {
if (!err) return;
const delay = timer.next();
debug('Will attempt reconnection in in %dms', delay);
reconnectTimeout = setTimeoutUnref(handleConnectionError.bind(null, borked, config, err), delay);
});
}
}
Loading

0 comments on commit 7e62644

Please sign in to comment.