From c2ff17bed921b1c6c457f5eab1bb81a5ee3b125b Mon Sep 17 00:00:00 2001 From: Daniel Lohse Date: Wed, 10 Apr 2024 22:34:52 +0200 Subject: [PATCH] Move fallback topic option to the configuration and allow the topic to be configured --- pubsub/gochannel/pubsub.go | 24 +++++++++++++++++------- pubsub/gochannel/pubsub_test.go | 31 +++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index fb9bf9f42..99da72011 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -11,9 +11,10 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. -// This is used if the `EnableFallback` configuration option is enabled. -const NoSubscribersFallbackTopic = "*" +// NoSubscribersFallbackDefaultTopic is the default fallback topic messages without any subscribers +// will be sent to – it is used if the `EnableNoSubscribersFallback` option is enabled and no +// fallback topic is configured via the `NoSubscribersFallbackTopic` option. +const NoSubscribersFallbackDefaultTopic = "*" // Config holds the GoChannel Pub/Sub's configuration options. type Config struct { @@ -32,8 +33,13 @@ type Config struct { BlockPublishUntilSubscriberAck bool // When true, messages sent to a topic without any subscribers will be sent to the - // subscribers of the `*` topic. - EnableFallback bool + // subscribers of the fallback topic (configured via `NoSubscribersFallbackTopic` option). + EnableNoSubscribersFallback bool + + // NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. + // This is used if the `EnableNoSubscribersFallback` configuration option is enabled. + // If it's not set then `*` is used by default. + NoSubscribersFallbackTopic string } // GoChannel is the simplest Pub/Sub implementation. @@ -69,6 +75,10 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { logger = watermill.NopLogger{} } + if config.EnableNoSubscribersFallback && config.NoSubscribersFallbackTopic == "" { + config.NoSubscribersFallbackTopic = NoSubscribersFallbackDefaultTopic + } + return &GoChannel{ config: config, @@ -149,12 +159,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} if len(subscribers) == 0 { - if !g.config.EnableFallback { + if !g.config.EnableNoSubscribersFallback { return g.handleNoSubscribers(ackedBySubscribers, logFields) } g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) - if subscribers = g.topicSubscribers(NoSubscribersFallbackTopic); len(subscribers) == 0 { + if subscribers = g.topicSubscribers(g.config.NoSubscribersFallbackTopic); len(subscribers) == 0 { return g.handleNoSubscribers(ackedBySubscribers, logFields) } } diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 758fad56f..ce358f70f 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -63,18 +63,41 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { assert.NoError(t, pubSub.Close()) } -func TestPublishSubscribe_enable_fallback(t *testing.T) { +func TestPublishSubscribe_enable_no_subscribers_fallback(t *testing.T) { messagesCount := 100 pubSub := gochannel.NewGoChannel( gochannel.Config{ - OutputChannelBuffer: int64(messagesCount), - EnableFallback: true, + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackDefaultTopic) + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + +func TestPublishSubscribe_enable_no_subscribers_fallback_with_custom_topic(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + NoSubscribersFallbackTopic: "custom_fallback_topic", }, watermill.NewStdLogger(true, true), ) topicName := "test_topic_" + watermill.NewUUID() - msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackTopic) + msgs, err := pubSub.Subscribe(context.Background(), "custom_fallback_topic") require.NoError(t, err) sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)