diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index 0d4fc61c7..e496f8168 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -11,6 +11,11 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) +// NoSubscribersFallbackDefaultTopic is the default fallback topic messages without any subscribers +// will be sent to – it is used if the `EnableNoSubscribersFallback` option is enabled and no +// fallback topic is configured via the `NoSubscribersFallbackTopic` option. +const NoSubscribersFallbackDefaultTopic = "*" + // Config holds the GoChannel Pub/Sub's configuration options. type Config struct { // Output channel buffer size. @@ -20,12 +25,21 @@ type Config struct { // it will receive all previously produced messages. // // All messages are persisted to the memory (simple slice), - // so be aware that with large amount of messages you can go out of the memory. + // so be aware that with a large amount of messages you can run out of memory. Persistent bool // When true, Publish will block until subscriber Ack's the message. // If there are no subscribers, Publish will not block (also when Persistent is true). BlockPublishUntilSubscriberAck bool + + // When true, messages sent to a topic without any subscribers will be sent to the + // subscribers of the fallback topic (configured via `NoSubscribersFallbackTopic` option). + EnableNoSubscribersFallback bool + + // NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to. + // This is used if the `EnableNoSubscribersFallback` configuration option is enabled. + // If it's not set then `*` is used by default. + NoSubscribersFallbackTopic string } // GoChannel is the simplest Pub/Sub implementation. @@ -52,15 +66,19 @@ type GoChannel struct { persistedMessagesLock sync.RWMutex } -// NewGoChannel creates new GoChannel Pub/Sub. +// NewGoChannel creates a new GoChannel Pub/Sub. // -// This GoChannel is not persistent. -// That means if you send a message to a topic to which no subscriber is subscribed, that message will be discarded. +// By default, GoChannel isn't persistent; that means messages sent to a topic +// without any subscribers will be discarded if the fallback option isn't enabled. func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { if logger == nil { logger = watermill.NopLogger{} } + if config.EnableNoSubscribersFallback && config.NoSubscribersFallbackTopic == "" { + config.NoSubscribersFallbackTopic = NoSubscribersFallbackDefaultTopic + } + return &GoChannel{ config: config, @@ -77,9 +95,9 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel { } // Publish in GoChannel is NOT blocking until all consumers consume. -// Messages will be send in background. +// Messages will be sent in the background. // -// Messages may be persisted or not, depending of persistent attribute. +// Messages may be persisted or not, depending on whether the persistent option is enabled. func (g *GoChannel) Publish(topic string, messages ...*message.Message) error { if g.isClosed() { return errors.New("Pub/Sub closed") @@ -141,9 +159,14 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic} if len(subscribers) == 0 { - close(ackedBySubscribers) - g.logger.Info("No subscribers to send message", logFields) - return ackedBySubscribers, nil + if !g.config.EnableNoSubscribersFallback { + return g.handleNoSubscribers(ackedBySubscribers, logFields) + } + + g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields) + if subscribers = g.topicSubscribers(g.config.NoSubscribersFallbackTopic); len(subscribers) == 0 { + return g.handleNoSubscribers(ackedBySubscribers, logFields) + } } go func(subscribers []*subscriber) { @@ -166,6 +189,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan return ackedBySubscribers, nil } +func (g *GoChannel) handleNoSubscribers(ackedBySubscribers chan struct{}, logFields watermill.LogFields) (<-chan struct{}, error) { + close(ackedBySubscribers) + g.logger.Info("No subscribers to send the message to", logFields) + return ackedBySubscribers, nil +} + // Subscribe returns channel to which all published messages are sent. // Messages are not persisted. If there are no subscribers and message is produced it will be gone. // diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 012064091..ce358f70f 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -63,6 +63,51 @@ func TestPublishSubscribe_not_persistent(t *testing.T) { assert.NoError(t, pubSub.Close()) } +func TestPublishSubscribe_enable_no_subscribers_fallback(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackDefaultTopic) + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + +func TestPublishSubscribe_enable_no_subscribers_fallback_with_custom_topic(t *testing.T) { + messagesCount := 100 + pubSub := gochannel.NewGoChannel( + gochannel.Config{ + OutputChannelBuffer: int64(messagesCount), + EnableNoSubscribersFallback: true, + NoSubscribersFallbackTopic: "custom_fallback_topic", + }, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgs, err := pubSub.Subscribe(context.Background(), "custom_fallback_topic") + require.NoError(t, err) + + sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName) + receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second) + + tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs) + + assert.NoError(t, pubSub.Close()) +} + func TestPublishSubscribe_block_until_ack(t *testing.T) { pubSub := gochannel.NewGoChannel( gochannel.Config{BlockPublishUntilSubscriberAck: true},