Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: support draining messages / removing nsqd from rotation #1305

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
Expand Down Expand Up @@ -66,6 +67,10 @@ func (p *program) Start() error {

options.Resolve(opts, flagSet, cfg)

if err := opts.Validate(); err != nil {
mreiferson marked this conversation as resolved.
Show resolved Hide resolved
log.Fatal(err)
}

nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
Expand Down
1 change: 1 addition & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
logLevel := opts.LogLevel
flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal")
flagSet.String("log-prefix", "[nsqd] ", "log message prefix")
flagSet.String("sigterm-mode", "shutdown", "action to take on a SIGTERM (shutdown, drain)")
flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level")

flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)")
Expand Down
26 changes: 21 additions & 5 deletions internal/http_api/topic_channel_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,40 @@ type getter interface {
Get(key string) (string, error)
}

func GetTopicChannelArgs(rp getter) (string, string, error) {
// GetTopicArg returns the ?topic parameter
func GetTopicArg(rp getter) (string, error) {
topicName, err := rp.Get("topic")
if err != nil {
return "", "", errors.New("MISSING_ARG_TOPIC")
return "", errors.New("MISSING_ARG_TOPIC")
}

if !protocol.IsValidTopicName(topicName) {
return "", "", errors.New("INVALID_ARG_TOPIC")
return "", errors.New("INVALID_ARG_TOPIC")
}
return topicName, nil
}

// GetChannelArg returns the ?channel parameter
func GetChannelArg(rp getter) (string, error) {
channelName, err := rp.Get("channel")
if err != nil {
return "", "", errors.New("MISSING_ARG_CHANNEL")
return "", errors.New("MISSING_ARG_CHANNEL")
}

if !protocol.IsValidChannelName(channelName) {
return "", "", errors.New("INVALID_ARG_CHANNEL")
return "", errors.New("INVALID_ARG_CHANNEL")
}
return channelName, nil
}

func GetTopicChannelArgs(rp getter) (string, string, error) {
topicName, err := GetTopicArg(rp)
if err != nil {
return "", "", err
}
channelName, err := GetChannelArg(rp)
if err != nil {
return "", "", err
}
return topicName, channelName, nil
}
18 changes: 18 additions & 0 deletions internal/test/assertions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package test

import (
"encoding/json"
"io/ioutil"
"net/http"
"path/filepath"
"reflect"
"runtime"
Expand Down Expand Up @@ -56,3 +59,18 @@ func isNil(object interface{}) bool {

return false
}

func HTTPError(t *testing.T, resp *http.Response, code int, message string) {
type ErrMessage struct {
Message string `json:"message"`
}

body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
t.Log(string(body))
Equal(t, code, resp.StatusCode)

var em ErrMessage
Nil(t, json.Unmarshal(body, &em))
Equal(t, message, em.Message)
}
15 changes: 15 additions & 0 deletions internal/test/guids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package test

import (
"sync/atomic"
)

// GUIDFactory is an atomic sequence that can be used for MessageID's for benchmarks
// to avoid ErrSequenceExpired when creating a large number of messages
type GUIDFactory struct {
n int64
}

func (gf *GUIDFactory) NextMessageID() int64 {
return atomic.AddInt64(&gf.n, 1)
}
38 changes: 22 additions & 16 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestHTTPTopicsGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_topics_get" + strconv.Itoa(int(time.Now().Unix()))
nsqds[0].GetTopic(topicName)
nsqds[0].GetOrCreateTopic(topicName)
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestHTTPTopicGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_topic_get" + strconv.Itoa(int(time.Now().Unix()))
nsqds[0].GetTopic(topicName)
nsqds[0].GetOrCreateTopic(topicName)
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -253,8 +253,9 @@ func TestHTTPChannelGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_channel_get" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic.GetChannel("ch")
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -292,8 +293,9 @@ func TestHTTPNodesSingleGET(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_nodes_single_get" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic.GetChannel("ch")
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -376,7 +378,7 @@ func TestHTTPTombstoneTopicNodePOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_tombstone_topic_node_post" + strconv.Itoa(int(time.Now().Unix()))
nsqds[0].GetTopic(topicName)
nsqds[0].GetOrCreateTopic(topicName)
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand All @@ -399,7 +401,7 @@ func TestHTTPDeleteTopicPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_delete_topic_post" + strconv.Itoa(int(time.Now().Unix()))
nsqds[0].GetTopic(topicName)
nsqds[0].GetOrCreateTopic(topicName)
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand All @@ -419,8 +421,9 @@ func TestHTTPDeleteChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_delete_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic.GetChannel("ch")
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand All @@ -440,7 +443,7 @@ func TestHTTPPauseTopicPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_pause_topic_post" + strconv.Itoa(int(time.Now().Unix()))
nsqds[0].GetTopic(topicName)
nsqds[0].GetOrCreateTopic(topicName)
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -474,8 +477,9 @@ func TestHTTPPauseChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_pause_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic.GetChannel("ch")
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.GetOrCreateChannel("ch")
time.Sleep(100 * time.Millisecond)

client := http.Client{}
Expand Down Expand Up @@ -509,7 +513,8 @@ func TestHTTPEmptyTopicPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_empty_topic_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
topic.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
test.Equal(t, int64(1), topic.Depth())
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -537,8 +542,9 @@ func TestHTTPEmptyChannelPOST(t *testing.T) {
defer nsqadmin1.Exit()

topicName := "test_empty_channel_post" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
channel := topic.GetChannel("ch")
topic, err := nsqds[0].GetOrCreateTopic(topicName)
test.Nil(t, err)
channel := topic.GetOrCreateChannel("ch")
channel.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))

time.Sleep(100 * time.Millisecond)
Expand Down
63 changes: 60 additions & 3 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Channel struct {
// state tracking
clients map[int64]Consumer
paused int32
isDraining int32
ephemeral bool
deleteCallback func(*Channel)
deleter sync.Once
Expand Down Expand Up @@ -122,6 +123,20 @@ func NewChannel(topicName string, channelName string, nsqd *NSQD,
return c
}

// InFlightCount returns the number of messages that have been sent to a client, but not yet FIN or REQUEUE'd
func (c *Channel) InFlightCount() int64 {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
return int64(len(c.inFlightMessages))
}

// DeferredCount returns the number of messages that are queued in-memory for future delivery to a client
func (c *Channel) DeferredCount() int64 {
c.deferredMutex.Lock()
defer c.deferredMutex.Unlock()
return int64(len(c.deferredMessages))
}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10))

Expand All @@ -136,6 +151,23 @@ func (c *Channel) initPQ() {
c.deferredMutex.Unlock()
}

// StartDraining starts draining a channel
//
// if there are no outstanding messages the channel is deleted immediately
// if there are messages outstanding it's deleted by FinishMessage
func (c *Channel) StartDraining() {
if !atomic.CompareAndSwapInt32(&c.isDraining, 0, 1) {
return
}
depth, inFlight, deferred := c.Depth(), c.InFlightCount(), c.DeferredCount()
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining. depth:%d inFlight:%d deferred:%d", c.name, depth, inFlight, deferred)
// if we are empty delete
if depth+inFlight+deferred == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition b/w this line and line 162 (a concurrent pub that had already passed the isDraining check but lost the race w/ line 162)

go c.deleter.Do(func() { c.deleteCallback(c) })
}
// else cleanup happens on last FinishMessage
}

// Exiting returns a boolean indicating if this channel is closed/exiting
func (c *Channel) Exiting() bool {
return atomic.LoadInt32(&c.exitFlag) == 1
Expand Down Expand Up @@ -187,6 +219,9 @@ func (c *Channel) exit(deleted bool) error {
return c.backend.Close()
}

// Empty drains the channel of messages.
//
// If the channel is draining this will delete the channel
func (c *Channel) Empty() error {
c.Lock()
defer c.Unlock()
Expand All @@ -196,16 +231,27 @@ func (c *Channel) Empty() error {
client.Empty()
}

MemoryDrain:
for {
select {
case <-c.memoryMsgChan:
default:
goto finish
break MemoryDrain
}
}

finish:
return c.backend.Empty()
err := c.backend.Empty()

// `backend.Empty` always results in an internal empty state (even if on-disk state might differ)
// so we want to logically continue to finish draining if applicable.
if atomic.LoadInt32(&c.isDraining) == 1 {
go c.deleter.Do(func() { c.deleteCallback(c) })
}

if err != nil {
return err
}
return nil
}

// flush persists all the messages in internal memory buffers to the backend
Expand Down Expand Up @@ -346,6 +392,8 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti
}

// FinishMessage successfully discards an in-flight message
//
// if this channel is draining and this is the last message this will initiate a channel deletion
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
Expand All @@ -355,6 +403,15 @@ func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
if c.e2eProcessingLatencyStream != nil {
c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
}

if atomic.LoadInt32(&c.isDraining) == 1 {
// if last msg, delete
depth, inFlight, deferred := c.Depth(), c.InFlightCount(), c.DeferredCount()
if depth+inFlight+deferred == 0 {
c.nsqd.logf(LOG_INFO, "CHANNEL(%s): draining complete", c.name)
go c.deleter.Do(func() { c.deleteCallback(c) })
}
}
return nil
}

Expand Down
Loading