From 9088eb3863f7ab7808fe2e9b4fe9862a2a80aba0 Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Sun, 7 May 2023 16:45:52 +0200 Subject: [PATCH 1/6] Fix grammar in comment --- pubsub/gochannel/pubsub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 0d4fc61c7..4770ad725 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -77,7 +77,7 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { } // Publish in GoChannel is NOT blocking until all consumers consume. -// Messages will be send in background. +// Messages will be sent in the background. // // Messages may be persisted or not, depending of persistent attribute. func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { From f6e8580db5df0b46d3680f01cd586fbf8f461966 Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Sun, 7 May 2023 16:57:27 +0200 Subject: [PATCH 2/6] Add `EnableFallback` configuration option and const for topic --- pubsub/gochannel/pubsub.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 4770ad725..c607b19e2 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -11,6 +11,10 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) +// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. +// This is used if the `EnableFallback` configuration option is enabled. +const NoSubscribersFallbackTopic = "*" + // Config holds the GoChannel Pub/Sub's configuration options. type Config struct { // Output channel buffer size. @@ -26,6 +30,10 @@ type Config struct { // When true, Publish will block until subscriber Ack's the message. // If there are no subscribers, Publish will not block (also when Persistent is true). BlockPublishUntilSubscriberAck bool + + // When true, messages sent to a topic without any subscribers will be sent to the + // subscribers of the `*` topic. + EnableFallback bool } // GoChannel is the simplest Pub/Sub implementation. From 68cd6df7233dee0aa43f3755bce95cc37f3a20d5 Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Sun, 7 May 2023 16:45:36 +0200 Subject: [PATCH 3/6] Implement `EnableFallback` configuration option If the option is enabled, messages sent to topic without any subscribers will be forwarded to the `*` topic subscribers. --- pubsub/gochannel/pubsub.go | 19 +++++++++++++++---- pubsub/gochannel/pubsub_test.go | 22 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index c607b19e2..b243d31e0 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -148,10 +148,15 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} - if len(subscribers) == 0 { - close(ackedBySubscribers) - g.logger.Info("No subscribers to send message", logFields) - return ackedBySubscribers, nil + switch { + case len(subscribers) == 0 && g.config.EnableFallback: + g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) + subscribers = g.topicSubscribers(NoSubscribersFallbackTopic) + if len(subscribers) == 0 { + return g.handleNoSubscribers(ackedBySubscribers, logFields) + } + case len(subscribers) == 0: + return g.handleNoSubscribers(ackedBySubscribers, logFields) } go func(subscribers []*subscriber) { @@ -174,6 +179,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan return ackedBySubscribers, nil } +func (g *GoChannel) handleNoSubscribers(ackedBySubscribers chan struct{}, logFields watermill.LogFields) (<-chan struct{}, error) { + close(ackedBySubscribers) + g.logger.Info("No subscribers to send the message to", logFields) + return ackedBySubscribers, nil +} + // Subscribe returns channel to which all published messages are sent. // Messages are not persisted. If there are no subscribers and message is produced it will be gone. // diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 012064091..758fad56f 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -63,6 +63,28 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { assert.NoError(t, pubSub.Close()) } +func TestPublishSubscribe_enable_fallback(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableFallback: true, + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic) + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + func TestPublishSubscribe_block_until_ack(t *testing.T) { pubSub := gochannel.NewGoChannel( gochannel.Config{BlockPublishUntilSubscriberAck: true}, From 0eae6f797c055b7c017e08be65b2a4f7abf7214c Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Sun, 7 Jan 2024 18:16:11 +0100 Subject: [PATCH 4/6] Improve implementation of the `EnableFallback` configuration option This goes back to the simple if and an early return when the option isn't enabled. --- pubsub/gochannel/pubsub.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index b243d31e0..fb9bf9f42 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -148,15 +148,15 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} - switch { - case len(subscribers) == 0 && g.config.EnableFallback: + if len(subscribers) == 0 { + if !g.config.EnableFallback { + return g.handleNoSubscribers(ackedBySubscribers, logFields) + } + g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) - subscribers = g.topicSubscribers(NoSubscribersFallbackTopic) - if len(subscribers) == 0 { + if subscribers = g.topicSubscribers(NoSubscribersFallbackTopic); len(subscribers) == 0 { return g.handleNoSubscribers(ackedBySubscribers, logFields) } - case len(subscribers) == 0: - return g.handleNoSubscribers(ackedBySubscribers, logFields) } go func(subscribers []*subscriber) { From c2ff17bed921b1c6c457f5eab1bb81a5ee3b125b Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Wed, 10 Apr 2024 22:34:52 +0200 Subject: [PATCH 5/6] Move fallback topic option to the configuration and allow the topic to be configured --- pubsub/gochannel/pubsub.go | 24 +++++++++++++++++------- pubsub/gochannel/pubsub_test.go | 31 +++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index fb9bf9f42..99da72011 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -11,9 +11,10 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. -// This is used if the `EnableFallback` configuration option is enabled. -const NoSubscribersFallbackTopic = "*" +// NoSubscribersFallbackDefaultTopic is the default fallback topic messages without any subscribers +// will be sent to – it is used if the `EnableNoSubscribersFallback` option is enabled and no +// fallback topic is configured via the `NoSubscribersFallbackTopic` option. +const NoSubscribersFallbackDefaultTopic = "*" // Config holds the GoChannel Pub/Sub's configuration options. type Config struct { @@ -32,8 +33,13 @@ type Config struct { BlockPublishUntilSubscriberAck bool // When true, messages sent to a topic without any subscribers will be sent to the - // subscribers of the `*` topic. - EnableFallback bool + // subscribers of the fallback topic (configured via `NoSubscribersFallbackTopic` option). + EnableNoSubscribersFallback bool + + // NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. + // This is used if the `EnableNoSubscribersFallback` configuration option is enabled. + // If it's not set then `*` is used by default. + NoSubscribersFallbackTopic string } // GoChannel is the simplest Pub/Sub implementation. @@ -69,6 +75,10 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { logger = watermill.NopLogger{} } + if config.EnableNoSubscribersFallback && config.NoSubscribersFallbackTopic == "" { + config.NoSubscribersFallbackTopic = NoSubscribersFallbackDefaultTopic + } + return &GoChannel{ config: config, @@ -149,12 +159,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} if len(subscribers) == 0 { - if !g.config.EnableFallback { + if !g.config.EnableNoSubscribersFallback { return g.handleNoSubscribers(ackedBySubscribers, logFields) } g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) - if subscribers = g.topicSubscribers(NoSubscribersFallbackTopic); len(subscribers) == 0 { + if subscribers = g.topicSubscribers(g.config.NoSubscribersFallbackTopic); len(subscribers) == 0 { return g.handleNoSubscribers(ackedBySubscribers, logFields) } } diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 758fad56f..ce358f70f 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -63,18 +63,41 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { assert.NoError(t, pubSub.Close()) } -func TestPublishSubscribe_enable_fallback(t *testing.T) { +func TestPublishSubscribe_enable_no_subscribers_fallback(t *testing.T) { messagesCount := 100 pubSub := gochannel.NewGoChannel( gochannel.Config{ - OutputChannelBuffer: int64(messagesCount), - EnableFallback: true, + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackDefaultTopic) + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + +func TestPublishSubscribe_enable_no_subscribers_fallback_with_custom_topic(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + NoSubscribersFallbackTopic: "custom_fallback_topic", }, watermill.NewStdLogger(true, true), ) topicName := "test_topic_" + watermill.NewUUID() - msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic) + msgs, err := pubSub.Subscribe(context.Background(), "custom_fallback_topic") require.NoError(t, err) sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) From 8808fd72d6f154d234c5af4ac281a946c0b7234e Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Wed, 10 Apr 2024 22:43:44 +0200 Subject: [PATCH 6/6] Improve clarity of comments on exported methods --- pubsub/gochannel/pubsub.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 99da72011..e496f8168 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -25,7 +25,7 @@ type Config struct { // it will receive all previously produced messages. // // All messages are persisted to the memory (simple slice), - // so be aware that with large amount of messages you can go out of the memory. + // so be aware that with a large amount of messages you can run out of memory. Persistent bool // When true, Publish will block until subscriber Ack's the message. @@ -66,10 +66,10 @@ type GoChannel struct { persistedMessagesLock sync.RWMutex } -// NewGoChannel creates new GoChannel Pub/Sub. +// NewGoChannel creates a new GoChannel Pub/Sub. // -// This GoChannel is not persistent. -// That means if you send a message to a topic to which no subscriber is subscribed, that message will be discarded. +// By default, GoChannel isn't persistent; that means messages sent to a topic +// without any subscribers will be discarded if the fallback option isn't enabled. func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { if logger == nil { logger = watermill.NopLogger{} @@ -97,7 +97,7 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { // Publish in GoChannel is NOT blocking until all consumers consume. // Messages will be sent in the background. // -// Messages may be persisted or not, depending of persistent attribute. +// Messages may be persisted or not, depending on whether the persistent option is enabled. func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { if g.isClosed() { return errors.New("Pub/Sub closed")