-
Notifications
You must be signed in to change notification settings - Fork 475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Auto-reconnection #25
Comments
Recovery of AMQP 0-9-1 connections involves more than simply reconnecting. You may want to get familiar with what other clients do. Fortunately, there are good examples It's a lot more involved for producers as socket issues may be detected in several seconds. As such, not only |
For example, Hutch adds a disk-based write-ahead log on top of Bunny among other things. |
Would it not make sense to roll that into the parent library? Maybe not disk-based, but a write-ahead log? |
When the connection drops you are pretty much hosed, and usually the best thing to do is start from scratch -- assert all your exchanges and queues again and go from there. There are some things the client can do for you (like Michael's Ruby client); but I am rather wary of second-guessing the application code. You might want such-and-such a queue to disappear with the connection. Regarding messages, yes you can use confirmations (now with callbacks!), which can change your guarantee from "at most once" (a message might not make it, and you won't know) to "at least once" (you might not get a confirmation, and so resend a message). (Aside: Do you guys get github emails from the future or something? You always have about three posts back and forth before I manage to comment ..) |
RAM-based write-ahead log is not an option for people who cannot afford to lose a single message (such as a couple of payment processors that are known to use Bunny). Others often simply don't need a WAL. In any case, I'm not at all convinced it should be part of client libraries. amqplib should try to provide about as much as Ruby clients do. Those provide over 2 years of automatic connection recovery experience to learn from. |
It'd make an interesting example, at least -- put a buffer in front of writes, and recover from that in the case of connection drops. The usual example for guaranteed delivery is moving rows from a database (or files from a filesystem), so the "reliable storage" bit is implicit in the scenario. |
In every case I can think of, the starting from scratch can be taken care of by the library, and if you want a transient queue or exchange you define it as such with Those steps, however, would quickly become convoluted and difficult to get right in an abstract enough manner to suit all users.
That's a good point. For us, we need nearly as much reliability, so RAM-based might not make sense (but what happens if the hard drive fails and causes the system to go down? Then you're really hosed. That said, I feel like it might be beneficial for those who don't need quite as much reliability.
That's more or less the extended version of what I was attempting to communicate in our other discussion. Instead of just keeping a buffer of unsent commands, keep a buffer of unacknowledged commands. This would allow one to publish transparently through the library and assume that it will reach its destination--provided the producer doesn't crash. |
I don't know the right answer. Auto-reconnection is a feature many database libraries support, though this isn't a database library. Regardless, in my ideal world the end-developer wouldn't need to worry about reliability as much. Reconnections are common enough that it seems a useful feature.
Yup, on every single one. Make sure you're watching the thread--it's a button at the bottom of issue pages. |
Nobody is saying that reconnection should not be supported, just that it is not as trivial as recovering TCP connection. |
Oh, absolutely. Who would you rather be in charge of recovery, the library or every developer using the library? Better to get it right once. |
If there was a definitive way to recover, the library; without that, every developer. |
Exactly. |
I'll provide my point of view as a developer who is considering switching from node-amqp to amqp.node: The lack of a reconnection feature is a barrier to me making the switch. If it was just a matter of changing the library, it would be an easy decision-- but currently it also means I need to design and implement all the reconnection logic too-- and that's enough to dissuade me from switching right now. I suspect there are other node-amqp users like me. Ideally, node.amqp would support a common-sense reconnection strategy that works for 80% of its users, and provide a way to turn it off for the other 20% who want to implement it in their own applications. That's what I'd love to see... Of course, it's easy to say that when I'm not the one who has to code it up. ;-) |
@cmoesel So the reconnection in node-amqp works for you? |
@squaremo It works well enough. I think there are leaks in there, but so far it has not negatively impacted us. |
This. You could take it a step further and provide a reconnect hook for say an 18% who have a specific reconnect strategy that fits into the reconnect envelope. Basically, on reconnect at a socket level, by default try to reconstruct the old state. If the user would prefer, give them a means to easily tap into the same information used in the default reconnection mechanism and set up the new connection. For the last, say, 2%, who need something very specialized, just let them disable it. Not strictly necessary, but providing a common structure for reconnection could help with maintaining the reconnection aspect of the project. That was a little convoluted...let me know if I can clarify something. |
This is a bad idea. It assumes that the desired state is always the same. What if, for example, queues are constructed according to received data? I would use something like a fixture; that is, a procedure that creates a known base state at each connection. You could do, for example, function fixture(conn) {
return conn.createChannel().then(function (ch) {
conn.on('error', reconnectAtSomePoint);
ch.assertQueue(TASKS);
ch.assertExchange(PUBSUB);
ch.close();
return conn;
});
}
function reconnect() {
amqp.connect(URL).then(fixture).then(main);
}
function main(conn) {
// set up consumers etc.
} There's nothing extra needed in amqplib in order to accomplish that yourself. Helpers for e.g., reconnection strategies ( |
Bunny tries to reconstruct the state it knows about with a couple of exceptions:
This works very well in practice but also relies on the fact that exchanges in queues in Bunny are objects, |
In our case, they are constructed according to received data. And once I've created them, I expect them to stay the same. It looks like this is related to the strategy Even this simple approach seems like it might work in many cases. You'd lose autoDelete entities, but you'd keep persistent ones.
That was sorta my idea for the 18% case, except that the fixture would know what the previous configuration was, if any. Not strictly required, and not necessarily useful, but could hint at some things.
That's what
Agreed, but if you just keep your Channel objects, why not use the channel ids? They already exist, just keep them! If the last state was valid--no overlapping channel ids, the new state will be too, right? |
Yes and that's how it works most of the time, but if you open and close channels all the time (not necessary but some do that to get very fine-grained error handling), at the time of recovery there may be concurrent channel id allocations. For similar cases, there are ways to "recover" a channel manually: keep the object but allocate |
Since your talking reconnections... Thoughts on supporting multiple hosts? |
I implemented a really simple reconnection strategy in my app: https://gist.github.com/benmoss/e93125d1fb3561be9276 (excuse the CoffeeScript 😄) For my use case just dropping messages is ok until reconnection is possible, though it's not hard to imagine how you could implement a buffer. |
It reads quite nicely in CoffeeScript, I reckon.
Exactly, if dropped messages did matter, you could extend it to use confirmations and a replay buffer. Although, more moving parts = more failure modes. |
That would certainly go hand-in-hand with reconnection. A typical scheme is to supply a collection of connection points (URLs, it would be here) for a cluster and try each in turn. By the way, this is the kind of thing I worry about with automatic reconnection: http://status.cloudamqp.com/incidents/56bhzt813hg9 (more info: https://twitter.com/CloudAMQP/status/455806520370798592; underlying problem: postwait/node-amqp#262). This particular issue was pretty straight-forward -- trying to redeclare queues with server-generated names will always fail -- but AMQP is full of corners like this, and that's not taking into account the idiosyncrasies of a given application. Making assumptions about the reconnection properties of queues is dangerous. That's not to say it can't be figured out. The RabbitMQ team certainly seem to think it's possible, since they (well, @michaelklishin for the most part, as I understand it) added reconnection to the Java client, and that is based, I would think, on how Bunny does it -- which presumably has worked very well in practice. I'm not comfortable with all the things the RabbitMQ Java client does for its recovery -- rebasing delivery tags, for example -- which come rather close to second-guessing the server behaviour (or, to put it another way, are tightly coupled with the server). I would prefer to require applications to be deliberate about recovery, help them where it is possible to do so without making additional assumptions, and to surface failures where it is not. |
I am not familiar with CoffeeScript, so I try to convert the https://gist.github.com/benmoss/e93125d1fb3561be9276 to JavaScript. Here it is https://gist.github.com/richzw/57177f3fecbeb921819c @squaremo and @benmoss, Could you please help me review it? Thanks in advance. I want to know what does the |
@richzw, coffeescript compiles into javascript, so if you need javascript, you don't need to manually convert it-- you can just take the result of compiling the coffeescript. Here is the result of the coffeescript you referenced above: http://pastebin.com/REPR1zP2 |
@richzw the |
@cmoesel and @benmoss thanks for your help. @benmoss, I want to publish message to amqp if the channel is ok, otherwise, continue to reconnect to amqp until the connection is ok. so
It seems works well. Now I want to establish one permanent connection with amqp when my apps starts, then call |
@richzw yeah, just create an instance of the MessageBus in your app startup and use it as a global object. i've seen this done by having a // globals.js
module.exports = {
messageBus: new MessageBus()
}; // somewhere_else.js
var messageBus = require("globals").messageBus;
messageBus.publish("order.created", {orderNumber: 555}); |
In this gist https://gist.github.com/richzw/6b4d348c6b8abdc8176e, I want to var Sender = require('./sender.js');
var sender = new Sender( amqpAddress, 3000);
function sendMessage( key, msg ) {
var byteBuffer = msg.encode().toBuffer();
sender.deliverMessage( key, byteBuffer );
}
// send message every 5 seconds
var CronJob = require('cron').CronJob;
var send_message_job = new CronJob({
cronTime: '*/5 * * * * *',
onTick: function(){
sendMessage( key, message );
},
start: true,
});
send_message_job.start(); Then I try to stop and start
Can anyone help me figure out how to handle it? |
@richzw sorry I can't help you |
Ok, I've taken a stab at fixing this whole reconnection problem. Check out https://github.com/benbria/node-amqp-connection-manager and see if it solves your use case. If it doesn't, or if you have suggestions, I'm certainly interested in hearing about it. :) |
Is there something horribly wrong with what I've done here, in regards to re-establishing a connection? It seems to work to reconnect to the server (I've been starting/stopping it for testing...) function connectRMQ() {
amqp.connect(config.rabbitmq.URI).then(function(conn) {
conn.on('close', function() {
console.error('Lost connection to RMQ. Reconnecting in 60 seconds...');
return setTimeout(connectRMQ, 60 * 1000);
});
return conn.createChannel().then(function(ch) {
var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
ok = ok.then(function() {
ch.prefetch(1);
ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
});
return ok.then(function() {
console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
});
function doWork(msg) {
var body = msg.content.toString();
console.log(" [x] Received %s", body);
setTimeout(function() {
ch.ack(msg);
}, config.rabbitmq.timeout);
}
});
}).then(null, function() {
setTimeout(connectRMQ, 10 * 1000);
return console.log('connection failed');
});
}
connectRMQ(); |
my implemention.
|
Ha, just see ioredis Redis client do connection recover stuff well code |
In my experience we have to listen to both the
I went ahead and subscribed to both, however, now in situation 1 both events are emitted, and if we're not careful we'll end up reconnecting twice, which eventually leads to function connect(onMessage, attempt = 0) {
let reconnecting = false;
function _onConnectionError(err) {
if (reconnecting) {
return Promise.resolve();
}
reconnecting = true;
const nextDelay = nextExponentialDelay(attempt, MAX_DELAY, DELAY_STEP, DELAY_RATE);
logger.error(`AMQP channel error, retrying in ${nextDelay} s`, err);
return new Promise(resolve => setTimeout(() => connect(onMessage, attempt + 1).then(resolve), nextDelay * 1000));
}
return mq.connect(amqpConn).then(connection => {
return connection.createChannel().then(channel => {
// recreate the exchange/queue topology as needed, then:
attempt = 0;
reconnecting = false;
connection.once("close", _onConnectionError);
connection.once("error", _onConnectionError);
logger.info("Waiting for messages");
return channel;
});
}).catch(_onConnectionError);
} |
``> In my experience we have to listen to both the
it works nice, one thing to add would be some condition to break, in case you wont like infinite restart loop
|
Here is my workaround: class ReconnectableProxy {
constructor(target) {
this.target = target
this.callLog = []
this.proxy = new Proxy({}, this.createHandler())
}
createHandler() {
return {
get: (obj, prop) => {
if (typeof this.target[prop] === "function") {
return (...args) => {
this.callLog.push({ method: prop, args })
return this.target[prop](...args)
}
}
return this.target[prop]
},
}
}
setTarget(newTarget) {
this.target = newTarget
}
async reconnect(newTarget) {
this.target = newTarget
await this.replayCalls()
}
async replayCalls() {
for (const { method, args } of this.callLog) {
await this.target[method](...args)
}
}
getProxy() {
return this.proxy
}
}
const proxyManager = new ReconnectableProxy()
let reconnecting = false
const createConnection = async () => {
const onConnectionError = async () => {
if (reconnecting) {
return
}
reconnecting = true
await yaRetry(
async (_bail) => {
logger.debug("rabbitmq disconnected, trying to reconnect")
const conn = await createConnection()
await proxyManager.reconnect(conn)
logger.debug("reconnected")
},
{
retries: 10,
minTimeout: 1000,
maxTimeout: 30000,
...(autoReconnect.retryOptions || {}),
}
)
reconnecting = false
}
const conn = await amqplib.connect(amqpURL)
conn.on("close", onConnectionError)
// conn.on("error", onConnectionError)
return conn
}
const conn = await createConnection()
proxyManager.setTarget(conn)
const connProxy = proxyManager.getProxy()
// then use connProxy in the code instead of conn imlpemented here: https://codeberg.org/devthefuture/modjo/src/branch/master/plugins/amqp/index.js |
Something to watch out for... conn.once("error", onConnectionError) You can get two error events. Because once will remove the error handler after the first, the second will be unhandled and can crash your application. See onebeyond/rascal#122 for a little more context. |
thanks @cressie176 for pointing this, I'm just realizing it now and I'm trying to find a better implementation, I will edit my post when it will be done edit: finally I edit with only the change to on, and removing on error, my final implementation will be a lot more complicated, but I want to handle channel auto-reconnect independently of connection (is this a good idea ?) |
At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only
throw
and there's no reconnect support.Even in a data center, connections can fail or time out, and reconnection would allow the connection to fail and recover from the failure, instead of simply giving up.
This might need to happen at the
connect.js
level given the current setup, but that would put messages that have been sent but not acked into an uncertain place where it's unclear if the message has reached the server or just fallen. I know from experience that trying to get reconnection to work well can be tricky. For a counterexample, seenode-amqp
's reconnection, which appears to leak everywhere.The text was updated successfully, but these errors were encountered: