From fb560ec92a86a4faa478d87817345f93a1fe3dfe Mon Sep 17 00:00:00 2001 From: Boris Maslovsky Date: Wed, 28 Feb 2024 19:21:18 +0100 Subject: [PATCH 1/2] fix for republishing to the new reconnected session --- _examples/pubsub/pubsub.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/_examples/pubsub/pubsub.go b/_examples/pubsub/pubsub.go index 50c0937..1c8d3e5 100644 --- a/_examples/pubsub/pubsub.go +++ b/_examples/pubsub/pubsub.go @@ -91,11 +91,12 @@ func publish(sessions chan chan session, messages <-chan message) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + pending := make(chan message, 1) + for session := range sessions { var ( running bool reading = messages - pending = make(chan message, 1) confirm = make(chan amqp.Confirmation, 1) ) From 7ba88ccc1e897b764ba0c9b82c475e2025c1e1e3 Mon Sep 17 00:00:00 2001 From: Boris Maslovsky Date: Wed, 28 Feb 2024 19:22:04 +0100 Subject: [PATCH 2/2] fix to close deprecated connections on the reconnect --- _examples/pubsub/pubsub.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/_examples/pubsub/pubsub.go b/_examples/pubsub/pubsub.go index 1c8d3e5..1c8ebc7 100644 --- a/_examples/pubsub/pubsub.go +++ b/_examples/pubsub/pubsub.go @@ -118,6 +118,7 @@ func publish(sessions chan chan session, messages <-chan message) { select { case confirmed, ok := <-confirm: if !ok { + pub.Close() break Publish } if !confirmed.Ack { @@ -191,6 +192,7 @@ func subscribe(sessions chan chan session, messages chan<- message) { messages <- msg.Body sub.Ack(msg.DeliveryTag, false) } + sub.Close() } }