Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"channel closed" when publishing before the reconnection is done #369

Open
aclarembeau opened this issue Jul 1, 2024 · 0 comments
Open

Comments

@aclarembeau
Copy link

aclarembeau commented Jul 1, 2024

Hello,

We are using this awesome library for a while, and, we have observed a strange behavior recently, causing us to lose a few AMQP messages. Could you help us to check this out?

We suspect that, if a connection is dropped, and we try to publish a message between the moment when the native AMQP connection is lost. The moment the wrapper receives the notification, publishing to a channel will fail.

This causes call like this:

channel.publish(...) 

To receive an error like:

"Error: channel closed\n    
at ConfirmChannel.<anonymous> (/app/node_modules/amqplib/lib/channel.js:43:14)\n    
at ConfirmChannel.emit (node:events:531:35)\n    at ConfirmChannel.emit (node:domain:551:15)\n    
at ConfirmChannel.emit (/app/node_modules/@opentelemetry/instrumentation-amqplib/build/src/amqplib.js:144:29)\n    
at ConfirmChannel.toClosed (/app/node_modules/amqplib/lib/channel.js:154:10)\n    
at Connection._closeChannels (/app/node_modules/amqplib/lib/connection.js:340:20)\n    at Connection.toClosed (/app/node_modules/amqplib/lib/connection.js:347:10)\n    
at Connection.onSocketError (/app/node_modules/amqplib/lib/connection.js:311:12)\n    at TLSSocket.emit (node:events:531:35)\n    at TLSSocket.emit (node:domain:488:12)\n    
at emitErrorNT (node:internal/streams/destroy:169:8)\n    
at emitErrorCloseNT (node:internal/streams/destroy:128:3)\n    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

This can be reproduced quite easily.
Just setup a local amqp server, like with :

docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management

And run two pieces of code:

A server:

const amqp = require('amqp-connection-manager');

// Create a new connection manager
const connection = amqp.connect(['amqp://localhost']);

// Create a channel wrapper
const channelWrapper = connection.createChannel({
  json: true,
  setup: channel => channel.assertQueue('task_queue', { durable: true })
});

console.log('Waiting for messages in task_queue. To exit press CTRL+C');

channelWrapper.addSetup(channel => {
  return channel.consume('task_queue', msg => {
    const content = msg.content.toString();
    console.log("Received: ", content);

    // Simulate some work
    setTimeout(() => {
      console.log("Processed: ", content);
      channel.ack(msg);

      // Send a response back
      channel.sendToQueue(msg.properties.replyTo,
        Buffer.from(`Processed: ${content}`), {
          correlationId: msg.properties.correlationId
        });
    }, 500);
  });
});

A client:

const amqp = require('amqp-connection-manager');
const { v4: uuidv4 } = require('uuid');

// Create a new connection manager
const connection = amqp.connect(['amqp://localhost']);
connection.on('connect', () => {console.log('connect')})
connection.on('disconnect', () => {console.log('disconnect')})

// Create a channel wrapper
const channelWrapper = connection.createChannel({
  json: true,
  setup: channel => {

    return channel.assertQueue("", { exclusive: true });
  }
});


const sendMessage = (message) => {
  const correlationId = uuidv4();
  const replyQueue = '';

  channelWrapper.sendToQueue('task_queue', (message), {
    correlationId: correlationId,
    replyTo: replyQueue
  });
};
(async () => {
  for (let i = 0; i < 10000; i++) {
    const message = `Message #${i + 1}`;
    console.log("Sending: ", message);
    const response = await sendMessage(message);
    await new Promise(resolve => setTimeout(resolve, 10));

  }
  await new Promise(resolve => setTimeout(resolve, 10));
  connection.close();
})();

Connect and disconnect the container running rabbitmq many times. After a few attempts, the sendMessage will fail.

We suspect this is caused by a race condition, that may not be simple to fix. It happens in scenario like this:

A network incident happens 
RabbitMQ connection is dropped
Channel is closed 
<= If we publish here, it fails
The lib handles the close event 
The lib reconnect the connection & the channel

Do you think there may be a solution around that?
For the moment, we are thinking about retrying publishes when they fail, but this would be better to have this built-in in the library.

Thank you very much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant