Skip to content

Commit

Permalink
Allowing to add handlers while router is already running (#266)
Browse files Browse the repository at this point in the history
* Allowing to add handlers while router is already running

* Fixed race condition

* Added option to stop Handler

* fmt

* checking if router is running in RunHandlers

* fix race condition

* bugfix

* fix race

* added docs

* fix race condition

* speedup TestFanIn
  • Loading branch information
roblaszczak authored Jan 29, 2022
1 parent b2fb298 commit 241bc78
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 58 deletions.
6 changes: 3 additions & 3 deletions components/fanin/fanin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func TestFanIn(t *testing.T) {
upstreamTopicPattern = "upstream-topic-%d"
downstreamTopic = "downstream-topic"

cancelAfter = time.Second
cancelAfter = time.Millisecond * 100

workersCount = 5
messagesCount = 100
workersCount = 3
messagesCount = 10
upstreamTopicsCount = 5
)

Expand Down
20 changes: 20 additions & 0 deletions docs/content/docs/messages-router.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ It can be useful to know if the router is running. You can use the `Running()` m
{{% load-snippet-partial file="src-link/message/router.go" first_line_contains="// Running" last_line_contains="func (r *Router) Running()" padding_after="0" %}}
{{% /render-md %}}


### Adding handler after the router has started

You can add a new handler while the router is already running.
To do that, you need to call `AddNoPublisherHandler` or `AddHandler` and call `RunHandlers`.

{{% render-md %}}
{{% load-snippet-partial file="src-link/message/router.go" first_line_contains="// RunHandlers" last_line_contains="func (r *Router) RunHandlers" padding_after="0" %}}
{{% /render-md %}}

### Stopping running handler

It is possible to stop the running handler by calling `Stop()`.

Please keep in mind, that router will be closed when there are no running handlers.

{{% render-md %}}
{{% load-snippet-partial file="src-link/message/router.go" first_line_contains="// Stop" last_line_contains="func (h *Handler) Stop()" padding_after="0" %}}
{{% /render-md %}}

### Execution models

*Subscribers* can consume either one message at a time or multiple messages in parallel.
Expand Down
2 changes: 1 addition & 1 deletion internal/norace.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build !race
//go:build !race

package internal

Expand Down
2 changes: 1 addition & 1 deletion internal/race.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build race
//go:build race

package internal

Expand Down
184 changes: 140 additions & 44 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func NewRouter(config RouterConfig, logger watermill.LoggerAdapter) (*Router, er

handlersWg: &sync.WaitGroup{},
runningHandlersWg: &sync.WaitGroup{},
handlerAdded: make(chan struct{}),

closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
Expand Down Expand Up @@ -124,10 +125,12 @@ type Router struct {

plugins []RouterPlugin

handlers map[string]*handler
handlers map[string]*handler
handlersLock sync.RWMutex

handlersWg *sync.WaitGroup
runningHandlersWg *sync.WaitGroup
handlerAdded chan struct{}

closeCh chan struct{}
closedCh chan struct{}
Expand Down Expand Up @@ -224,6 +227,8 @@ func (d DuplicateHandlerNameError) Error() string {
// When handler needs to publish to multiple topics,
// it is recommended to just inject Publisher to Handler or implement middleware
// which will catch messages and publish to topic based on metadata for example.
//
// If handler is added while router is already running, you need to explicitly call RunHandlers().
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
Expand All @@ -237,6 +242,9 @@ func (r *Router) AddHandler(
"topic": subscribeTopic,
})

r.handlersLock.Lock()
defer r.handlersLock.Unlock()

if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
Expand All @@ -258,11 +266,20 @@ func (r *Router) AddHandler(
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
messagesCh: nil,
closeCh: r.closeCh,
routersCloseCh: r.closeCh,

startedCh: make(chan struct{}),
}

r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler

select {
case r.handlerAdded <- struct{}{}:
default:
// closeWhenAllHandlersStopped is not always waiting for handlerAdded
}

return &Handler{
router: r,
handler: newHandler,
Expand All @@ -278,6 +295,8 @@ func (r *Router) AddHandler(
// subscribeTopic is a topic from which handler will receive messages.
//
// subscriber is Subscriber from which messages will be consumed.
//
// If handler is added while router is already running, you need to explicitly call RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
Expand Down Expand Up @@ -317,67 +336,111 @@ func (r *Router) Run(ctx context.Context) (err error) {
}
}

r.logger.Debug("Applying decorators", nil)
if err := r.RunHandlers(ctx); err != nil {
return err
}

close(r.running)

go r.closeWhenAllHandlersStopped()

<-r.closeCh
cancel()

r.logger.Info("Waiting for messages", watermill.LogFields{
"timeout": r.config.CloseTimeout,
})

<-r.closedCh

r.logger.Info("All messages processed", nil)

return nil
}

// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
if !r.isRunning {
return errors.New("you can't call RunHandlers on non-running router")
}

r.handlersLock.Lock()
defer r.handlersLock.Unlock()

for name, h := range r.handlers {
if err = r.decorateHandlerPublisher(h); err != nil {
name := name
h := h

if h.started {
continue
}

if err := r.decorateHandlerPublisher(h); err != nil {
return errors.Wrapf(err, "could not decorate publisher of handler %s", name)
}
if err = r.decorateHandlerSubscriber(h); err != nil {
if err := r.decorateHandlerSubscriber(h); err != nil {
return errors.Wrapf(err, "could not decorate subscriber of handler %s", name)
}
}

for _, h := range r.handlers {
r.logger.Debug("Subscribing to topic", watermill.LogFields{
"subscriber_name": h.name,
"topic": h.subscribeTopic,
})

ctx, cancel := context.WithCancel(ctx)

messages, err := h.subscriber.Subscribe(ctx, h.subscribeTopic)
if err != nil {
cancel()
return errors.Wrapf(err, "cannot subscribe topic %s", h.subscribeTopic)
}

h.messagesCh = messages
}

for i := range r.handlers {
handler := r.handlers[i]
h.started = true
close(h.startedCh)

r.handlersWg.Add(1)
h.stopFn = cancel
h.stopped = make(chan struct{})

go func() {
handler.run(r.middlewares)
defer cancel()

h.run(ctx, r.middlewares)

r.handlersWg.Done()
r.logger.Info("Subscriber stopped", watermill.LogFields{
"subscriber_name": handler.name,
"topic": handler.subscribeTopic,
"subscriber_name": h.name,
"topic": h.subscribeTopic,
})

r.handlersLock.Lock()
delete(r.handlers, name)
r.handlersLock.Unlock()
}()
}

close(r.running)

go r.closeWhenAllHandlersStopped()

<-r.closeCh
cancel()

r.logger.Info("Waiting for messages", watermill.LogFields{
"timeout": r.config.CloseTimeout,
})

<-r.closedCh

r.logger.Info("All messages processed", nil)

return nil
}

// closeWhenAllHandlersStopped closed router, when all handlers has stopped,
// because for example all subscriptions are closed.
func (r *Router) closeWhenAllHandlersStopped() {
r.handlersLock.RLock()
hasHandlers := len(r.handlers) == 0
r.handlersLock.RUnlock()

if hasHandlers {
// in that situation router will be closed immediately (even if they are no routers)
// let's wait for
select {
case <-r.handlerAdded:
// it should be some handler to track
case <-r.closedCh:
// let's avoid goroutine leak
return
}
}

r.handlersWg.Wait()
if r.isClosed() {
// already closed
Expand Down Expand Up @@ -406,6 +469,9 @@ func (r *Router) Close() error {
r.closedLock.Lock()
defer r.closedLock.Unlock()

r.handlersLock.Lock()
defer r.handlersLock.Unlock()

if r.closed {
return nil
}
Expand All @@ -419,7 +485,7 @@ func (r *Router) Close() error {

timeouted := sync_internal.WaitGroupTimeout(r.handlersWg, r.config.CloseTimeout)
if timeouted {
return errors.New("router close timeouted")
return errors.New("router close timeout")
}

return nil
Expand Down Expand Up @@ -450,10 +516,15 @@ type handler struct {

messagesCh <-chan *Message

closeCh chan struct{}
started bool
startedCh chan struct{}

stopFn context.CancelFunc
stopped chan struct{}
routersCloseCh chan struct{}
}

func (h *handler) run(middlewares []middleware) {
func (h *handler) run(ctx context.Context, middlewares []middleware) {
h.logger.Info("Starting handler", watermill.LogFields{
"subscriber_name": h.name,
"topic": h.subscribeTopic,
Expand All @@ -469,7 +540,7 @@ func (h *handler) run(middlewares []middleware) {
}
}

go h.handleClose()
go h.handleClose(ctx)

for msg := range h.messagesCh {
h.runningHandlersWg.Add(1)
Expand All @@ -485,6 +556,7 @@ func (h *handler) run(middlewares []middleware) {
}

h.logger.Debug("Router handler stopped", nil)
close(h.stopped)
}

// Handler handles Messages.
Expand All @@ -506,6 +578,27 @@ func (h *Handler) AddMiddleware(m ...HandlerMiddleware) {
h.router.addHandlerLevelMiddleware(handler.name, m...)
}

// Started returns channel which is stopped when handler is running.
func (h *Handler) Started() chan struct{} {
return h.handler.startedCh
}

// Stop stops the handler.
// Stop is asynchronous.
// You can check if handler was stopped with Stopped() function.
func (h *Handler) Stop() {
if !h.handler.started {
panic("handler is not started")
}

h.handler.stopFn()
}

// Stopped returns channel which is stopped when handler did stop.
func (h *Handler) Stopped() chan struct{} {
return h.handler.stopped
}

// decorateHandlerPublisher applies the decorator chain to handler's publisher.
// They are applied in reverse order, so that the later decorators use the result of former ones.
func (r *Router) decorateHandlerPublisher(h *handler) error {
Expand Down Expand Up @@ -574,16 +667,19 @@ func (h *handler) addHandlerContext(messages ...*Message) {
}
}

func (h *handler) handleClose() {
<-h.closeCh

h.logger.Debug("Waiting for subscriber to close", nil)

if err := h.subscriber.Close(); err != nil {
h.logger.Error("Failed to close subscriber", err, nil)
func (h *handler) handleClose(ctx context.Context) {
select {
case <-h.routersCloseCh:
// for backward compatibility we are closing subscriber
h.logger.Debug("Waiting for subscriber to close", nil)
if err := h.subscriber.Close(); err != nil {
h.logger.Error("Failed to close subscriber", err, nil)
}
h.logger.Debug("Subscriber closed", nil)
case <-ctx.Done():
// we are closing subscriber just when entire router is closed
}

h.logger.Debug("Subscriber closed", nil)
h.stopFn()
}

func (h *handler) handleMessage(msg *Message, handler HandlerFunc) {
Expand Down
Loading

0 comments on commit 241bc78

Please sign in to comment.