You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
module.exports.consume = async function (ex, qname, msgKey, invkFn) {
const connection = await amqp.connect(AMQP_URL);
const onMessage = data => {
channelWrapper.ack(data);
//call the function to be invoked on receipt of a message
invkFn(data);
}
// Set up a channel listening for messages in the queue.
const channelWrapper = connection.createChannel({
setup: channel =>
Promise.all([
channel.assertExchange(ex, 'direct', { durable: true }),
channel.assertQueue(qname, { exclusive: false }), // , autoDelete: true
channel.bindQueue(qname, ex, msgKey),
channel.consume(qname, onMessage)
])
});
channelWrapper.waitForConnect()
.then(function () {
console.log("[INFO] Waiting for messages in %s. To exit press CTRL+C", qname);
});
}
This is my code, please help!
The text was updated successfully, but these errors were encountered:
I am having a similar problem. It looks like it happens after logs of mirroring of messages inside the cluster, I think the channel state was not updated for some reason
"use strict";
const amqp = require('amqp-connection-manager');
const mqcon = require('./mqConnection');
const AMQP_URL = mqcon.getAMQPURL();
module.exports.success = function success(res, next, data) {
_respond(res, next, 'success', data, 200);
}
module.exports.failure = function failure(res, next, data, http_code) {
_respond(res, next, 'failure', data, http_code);
}
module.exports.publish = async function (msgKey, msgPayload) {
const connection = await amqp.connect(AMQP_URL);
const exch = 'taskch';
// Create a channel wrapper
const channelWrapper = await connection.createChannel({
setup: channel => channel.assertExchange(exch, 'direct', { durable: true })
});
await channelWrapper.publish(exch, msgKey, Buffer.from(msgPayload))
.then(function () {
// console.log("Message sent", msgPayload);
})
.catch(err => {
console.log("[ERROR] Message was rejected:", err.stack);
channelWrapper.close();
connection.close();
});
};
module.exports.consume = async function (ex, qname, msgKey, invkFn) {
const connection = await amqp.connect(AMQP_URL);
const onMessage = data => {
channelWrapper.ack(data);
//call the function to be invoked on receipt of a message
invkFn(data);
}
// Set up a channel listening for messages in the queue.
const channelWrapper = connection.createChannel({
setup: channel =>
Promise.all([
channel.assertExchange(ex, 'direct', { durable: true }),
channel.assertQueue(qname, { exclusive: false }), // , autoDelete: true
channel.bindQueue(qname, ex, msgKey),
channel.consume(qname, onMessage)
])
});
channelWrapper.waitForConnect()
.then(function () {
console.log("[INFO] Waiting for messages in %s. To exit press CTRL+C", qname);
});
}
This is my code, please help!
The text was updated successfully, but these errors were encountered: