Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple channels are getting created if channel errors out due to a pre-condition error #294

Open
divijklenty opened this issue Sep 26, 2022 · 0 comments

Comments

@divijklenty
Copy link

Hi,
Multiple channels open when a channel closes due to an error (have reproduced it with a 406 pre-condition error), Had to handle the recreation of an errored channel because amqp-connection-manager wasn't triggering reconnection.

The consumer.js file keeps acknowledging messages that it receives, on closing the connection from the rabbitmq console, it connects successfully and restores all the channels but closing them again throws a pre-condition 406 error which I'm assuming is coming because I'm trying to acknowledge on a different channel. From this point onwards it keeps on opening new channels every time the channel closes due to an error.

const amqpManager = require('amqp-connection-manager');

(() => {
	const connection = amqpManager.connect(<amqp_url>, {reconnectTimeInSeconds: 3, connectionOptions: {clientProperties: { connection_name: 'test consumer' }}});

	connection.on('connect', () => console.log('connected'));
	connection.on('disconnect', (err) => console.log('disconnected ', err));

	const wait = (ms) =>  new Promise(resolve => setTimeout(resolve, ms));

	class Consumer {
		constructor(connection, queueName) {
			this.connection = connection;
			this.queueName = queueName;
			this.isChannelOpen = {};

			return this.createConsumer();
		}

		async consume(ch, msg) {
			const msgNew = JSON.parse(msg.content.toString());
			await wait(4000);

			try {
				/* Only want to acknowledge message if channel is open otherwise wait for message timeout to requeue the message */
				if (ch.isOpen()) {
					console.log('msg consumed', msgNew.data.count);
					return ch.ack(msg);
				} else {
					console.log('Channel is not open, not acknowledging messages');
				}
			} catch (err) {
				console.log('err in consume', err);
			}
		};

		createConsumer() {
			let _this = this;

			const channelConnectFunction = async (ch) => {
				try {
					// await channelWrapper.waitForConnect();
					_this.isChannelOpen[_this.queueName] = true;
					const { consumerTag } = await ch.consume(_this.queueName, _this.consume.bind(_this, channelWrapper));
					console.log(`Starting consumer for queue ${_this.queueName} | consumerTag ${consumerTag}`);
					return consumerTag;
				} catch (error) {
					console.log('Error in channel connect function ', error);
				}
			};

			const retry = (ms) => setTimeout(() => {
				if (!_this.isChannelOpen[_this.queueName]) {
					console.log("Retrying creation of consumers for", _this.queueName);
					return _this.createConsumer();
				} else {
					console.log("Skipping channel creation for queue", _this.queueName);
				}
			}, ms);

			const channelWrapper = _this.connection.createChannel({
				name: _this.queueName,
				confirm: false,
				setup: async (ch) => {
					ch.on("close", () => {
						console.log('[AMQPLIB] Closing channel, setting channelAvailable to false');
						_this.isChannelOpen[_this.queueName] = false;
					});

					ch.on("error", async (err) => {
						_this.isChannelOpen[_this.queueName] = false;
						console.log('[AMQPLIB] Channel error event triggered.');
						console.log(err);
						/* recreating consumer if channel errors out due to a pre-condition error (or any other error which kills the channel but keeps the connection alive) */
						retry.call(_this, 8000);
					});

					ch.isOpen = () => _this.isChannelOpen[_this.queueName];

					return Promise.all([
						ch.assertQueue(_this.queueName, { durable: true }),
						ch.prefetch(100),
						channelConnectFunction(ch)
					]);
				}
			});

			channelWrapper.on('error', (error, { name }) => {
				console.log(`Channel ${name} errored`);
				console.log(error);
			});

			channelWrapper.on('close', () => console.log('Consumer channel closed'));

			channelWrapper.on("connect", () => console.log(`Succesfully connected to channel for queue ${_this.queueName}`));

			channelWrapper.isOpen = () => _this.isChannelOpen[_this.queueName];

			return channelWrapper;
		};
	}

	const QUEUES = [
		'amqp-connection-manager-sample',
	];

	for (const queue of QUEUES) {
		const consumer = new Consumer(connection, queue);
	}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant