Skip to content

Commit

Permalink
Added support for Event Handler groups and new CQRS public API (#367)
Browse files Browse the repository at this point in the history
Co-authored-by: Miłosz Smółka <[email protected]>
  • Loading branch information
roblaszczak and m110 authored Jun 30, 2023
1 parent 2a1fb82 commit 7cb0e23
Show file tree
Hide file tree
Showing 33 changed files with 3,019 additions and 367 deletions.
2 changes: 0 additions & 2 deletions _examples/basic/2-realtime-feed/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ func main() {
logger := watermill.NewStdLogger(false, false)
logger.Info("Starting the producer", watermill.LogFields{})

rand.Seed(time.Now().Unix())

publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: brokers,
Expand Down
2 changes: 1 addition & 1 deletion _examples/basic/5-cqrs-protobuf/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
golang:
image: golang:1.19
image: golang:1.20
restart: unless-stopped
ports:
- 8080:8080
Expand Down
2 changes: 1 addition & 1 deletion _examples/basic/5-cqrs-protobuf/go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module main.go

require (
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1
Expand Down
8 changes: 8 additions & 0 deletions _examples/basic/5-cqrs-protobuf/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11 h1:tQJ3L/AnfliXaxaq+ElHOfzi0Vx+AN8cAnIOLcUTrxo=
github.com/ThreeDotsLabs/watermill v1.2.0-rc.11/go.mod h1:QLZSaklpSZ/7yv288LL2DFOgCEi86VYEmQvzmaMlHoA=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0 h1:920Tfprg3Lwn7ieUOtnSZSz73UhZLeqzkf/fozF5vZ4=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620142403-c0e20f18aef0/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be h1:7c3tZkJ3w2jB0S9xRkUvMNVUD/AE49c6wAFr1AyMy/g=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230620191859-b41f6a2770be/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8 h1:S+6+P94VcfyiJCtmP6q8mGoMuSGmeAZc8lfIbDJ1R/E=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230622094202-6eb02dc0b0b8/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc h1:j8hIjk/pE05TmiEoiljsJPTZCL/4OJEUifWoFlMs0HI=
github.com/ThreeDotsLabs/watermill v1.2.1-0.20230623082929-7fe0ca7ad2cc/go.mod h1:zn/7F0TGOr1K/RX7bFbVxii6p1abOMLllAMpVpKinQg=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7 h1:AUSXLqdsA1LXWuoQSkIRG9FhMo6EYM9GSgd+bnf1W0w=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.0.7/go.mod h1:DKOBUoMVtPMV8jBEbRP3NL6TgnOMyRvVza3W297cdqU=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
Expand Down
163 changes: 128 additions & 35 deletions _examples/basic/5-cqrs-protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,56 +207,149 @@ func main() {
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)

// cqrs.Facade is facade for Command and Event buses and processors.
// You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
GenerateCommandsTopic: func(commandName string) string {
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return commandName
return params.CommandName, nil
},
CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
return []cqrs.CommandHandler{
BookRoomHandler{eb},
OrderBeerHandler{eb},
}
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})

params.Message.Metadata.Set("sent_at", time.Now().String())

return nil
},
CommandsPublisher: commandsPublisher,
CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}

commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()

err := params.Handler.Handle(params.Message.Context(), params.Command)

logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})

return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
GenerateEventsTopic: func(eventName string) string {
)
if err != nil {
panic(err)
}

eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events"
return "events", nil

// we can also use topic per event type
// return eventName
// return params.EventName, nil
},
EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
OrderBeerOnRoomBooked{cb},
NewBookingsFinancialReport(),
}
},
EventsPublisher: eventsPublisher,
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),
)

return amqp.NewSubscriber(config, logger)

OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})

params.Message.Metadata.Set("published_at", time.Now().String())

return nil
},
Router: router,
CommandEventMarshaler: cqrsMarshaler,
Logger: logger,

Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}

eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)

return amqp.NewSubscriber(config, logger)
},

OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()

err := params.Handler.Handle(params.Message.Context(), params.Event)

logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})

return err
},

Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}

err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}

err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},

NewBookingsFinancialReport(),

cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}

// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(cqrsFacade.CommandBus())
go publishCommands(commandBus)

// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions _examples/real-world-examples/consumer-groups/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"context"
"math/rand"
"net/http"
"sync"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
Expand All @@ -16,7 +14,6 @@ import (
)

func main() {
rand.Seed(time.Now().UnixNano())
logger := watermill.NewStdLogger(false, false)

router, err := message.NewRouter(message.RouterConfig{}, logger)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package main

import (
"math/rand"
"net/http"
"time"

"github.com/ThreeDotsLabs/watermill"
)

func main() {
rand.Seed(time.Now().Unix())

logger := watermill.NewStdLogger(false, false)

postsStorage := NewPostsStorage()
Expand Down
Loading

0 comments on commit 7cb0e23

Please sign in to comment.