Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish fails after AwaitConnection #247

Open
bronger opened this issue Apr 4, 2024 · 14 comments
Open

Publish fails after AwaitConnection #247

bronger opened this issue Apr 4, 2024 · 14 comments

Comments

@bronger
Copy link

bronger commented Apr 4, 2024

I use the following code snipped to publish an MQTT message:

if err := cm.AwaitConnection(ctx); err != nil {
        return err
}
pr, err := cm.Publish(ctx, &paho.Publish{
        QoS:     QoS,
        Retain:  Retain,
        Topic:   topic,
        Payload: msg,
})

Then, I observe sometimes that err is “writev tcp 10.xxx.xxx.xxx:39508->192.168.xxx.xxx:1883: use of closed network connection”.

I’m confused, and I am not sure whether this is a bug. FWIW, my expectation was that after AwaitConnection, the connection is usable. Of course, it can be simply the MQTT broker having an outage in the wrong moment, but two of my jobs died simultaneously with this error message, so I don’t think its due to bad timing.

I use “github.com/eclipse/paho.golang v0.20.0”.

@MattBrittan
Copy link
Contributor

Please try 0.21 (probably will not help but slight chance the pinger bug, that was fixed, is the issue).

Otherwise, unfortunately, im going to need quite a bit more info (ideally code that replicates the issue, debug logs and broker logs). I understand this can be time consuming to pull together but its difficult to fix an issue I cannot replicate.

@bronger
Copy link
Author

bronger commented Apr 7, 2024

Okay, I’ll try.

But at the moment, the info that it should work is already very helpful to me, in the sense that my code snippet above does not lack e.g. a step between AwaitConnection and Publish.

@MattBrittan
Copy link
Contributor

Your code is basically the same as this example (it runs Publish in a go routine but the timing should be a bit the same). I test this by starting things up in docker containers and then interrupting the network connection and have not experienced the issue you are reporting (note that this error is to be expected when the connection drops).

So your code looks fine. If you are able to provide a reproducible example then I'll have a look (but please note that these issues can sometimes be difficult to replicate and may be broker dependent).

@bronger bronger changed the title Publish fails after AwaitCOnnection Publish fails after AwaitConnection Apr 12, 2024
@jhillyerd
Copy link

I ran into something similar today. It ended up being caused by a framework canceling the Context I provided to NewConnection() -- worth checking for anyone else seeing similar behavior.

@bronger
Copy link
Author

bronger commented May 2, 2024

As far as I’m concerned, this can be closed as I cannot reproduce it any more. I do see the error message if I have two programs running at the same time and using – erroneously – the same client ID. So maybe this was also the root cause of my original report.

@MattBrittan
Copy link
Contributor

I do see the error message if I have two programs running at the same time and using – erroneously – the same client ID.

Are you able to provide these programs? Even if this only happens very occasionally, and in specific circumstances, I'd like to fix it (sounds like there may be a race condition). The issue with this kind of issue is that if you cannot duplicate them then they are hard to trace/fix.

@bronger
Copy link
Author

bronger commented May 4, 2024

I put a mini project for reproducing the issue at https://gitlab.com/bronger/mqtt-test.

@MattBrittan
Copy link
Contributor

Thanks very much for putting this demonstration together.

In this case I believe the library is acting correctly; what appears to be happening is:

  1. Connection up (so cm.AwaitConnection returns)
  2. Connection dropped due to new connection from other client (with the same client id)
  3. cm.Publish attempts to send the PUBLISH packet; network connection is closed so this fails (loss of connection had not been detected previously).

The simplest way to avoid this would be to use PublishViaQueue which manages this process for you (and should correctly handle things like the MQTT session management which get a bit complex).

This is tangentially related to #249 (my comments on this PR anyway) in that the client attempts to reconnect immediately when dropped (so when using duplicate ID's the connect/drop/reconnect loop is very quick).

@bronger
Copy link
Author

bronger commented May 5, 2024

I see the error also with PublishViaQueue.

Probably one should consider AwaitConnection as a means of having necessary preconditons fulfilled for a successful publish. However, it is not sufficient. Thus, one should e.g. retry a publish a couple of times (before giving up)?

@MattBrittan
Copy link
Contributor

I see the error also with PublishViaQueue.

OK, I'll test this later in the week (that function should succeed regardless of the connection state; the message should go into the queue and be sent when possible). Currently there is no way to tell when the message has actually been sent (that's on the to-do list).

Probably one should consider AwaitConnection as a means of having necessary preconditons fulfilled for a successful publish. However, it is not sufficient. Thus, one should e.g. retry a publish a couple of times (before giving up)?

Well AwaitConnection does what it says - waits for the connection to come up. I don't think there is really anything we can do (other than returning an error as happens now) to cater for the situation where the connection drops between that happening and the next action (e.g. a Publish). The intention of PublishViaQueue is to provide an easy solution for this (where the library manages the process of sending the message as soon as possible but in order).

@MattBrittan
Copy link
Contributor

I have tested this with PublishViaQueue i.e.

func publishMQTT(ctx context.Context, c *autopaho.ConnectionManager, topic string, data any) error {
	msg, err := json.Marshal(data)
	if err != nil {
		panic(err)
	}
	if err := c.AwaitConnection(ctx); err != nil {
		// err can only be the cancelled context
		return err
	}
	err = c.PublishViaQueue(ctx, &autopaho.QueuePublish{
		Publish: &paho.Publish{
			Topic:   topic,
			Payload: msg,
		},
	})
	if err != nil {
		return fmt.Errorf("Error publishing to MQTT topic “%s”: %w", topic, err)
	} else {
		slog.Info("Queued MQTT message successfully", "message", string(msg), "topic", topic)
		return nil
	}
}

This appeared to work successfully (no errors reported) - however note that the AwaitConnection is not actually needed here (because the message will be queued regardless). I added a subscriber and it received a few messages (unsurprising it did not get more as the connection drops almost instantly).

So apologies but I've been unable to duplicate the second part of this (using PublishViaQueue) and the first bit seems as expected (connection can drop between c.AwaitConnection(ctx) returning and calling Publish).

@bronger
Copy link
Author

bronger commented May 9, 2024

Okay, thank you for your investigations and explanations!

With PublishViaQueue, for me it takes longer to get the error message but they come nevertheless. Anyway, I just wrap publishing in a retry cycle. I think this is absolutely fine. After all, one does the same thing with network connections.

@MattBrittan
Copy link
Contributor

With PublishViaQueue, for me it takes longer to get the error message but they come nevertheless.

Does your code look similar to mine? I'm struggling to see how you could get the same error because PublishViaQueue just adds the message to the queue and then returns (so returning use of closed network connection does not seem possible). OnClientError may still receive these errors but with autopaho you can ignore this (as it will reconnect automatically and resend the message).

@bronger
Copy link
Author

bronger commented May 9, 2024

Now revisiting it, I realise that the error is different. It is a WARN and not a panic any more. So everything is fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants