Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Feb 9, 2019
1 parent 3a29c98 commit 673072e
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 145 deletions.
2 changes: 1 addition & 1 deletion nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
commands = append(commands, nsq.Register(topic.name, channel.name))
}
}
topic.RUnlock()
Expand Down
102 changes: 0 additions & 102 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,108 +153,6 @@ success:
<-doneExitChan
}

func TestMetadataMigrate(t *testing.T) {
old_meta := `
{
"topics": [
{
"channels": [
{"name": "c1", "paused": false},
{"name": "c2", "paused": false}
],
"name": "t1",
"paused": false
}
],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

oldFn := oldMetadataFile(opts)
err = ioutil.WriteFile(oldFn, []byte(old_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)
err = nsqd.PersistMetadata()
test.Nil(t, err)
nsqd.Exit()

oldFi, err := os.Lstat(oldFn)
test.Nil(t, err)
test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink)

_, _, nsqd = mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)

t1, err := nsqd.GetExistingTopic("t1")
test.Nil(t, err)
test.NotNil(t, t1)
c2, err := t1.GetExistingChannel("c2")
test.Nil(t, err)
test.NotNil(t, c2)

nsqd.Exit()
}

func TestMetadataConflict(t *testing.T) {
t.Skipf("fails because New() uses os.Exit for failure")

old_meta := `
{
"topics": [{
"name": "t1", "paused": false,
"channels": [{"name": "c1", "paused": false}]
}],
"version": "1.0.0-alpha"
}`
new_meta := `
{
"topics": [{
"name": "t2", "paused": false,
"channels": [{"name": "c2", "paused": false}]
}],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600)
if err != nil {
panic(err)
}
err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600)
if err != nil {
panic(err)
}

// _, _, nsqd := mustStartNSQD(opts)
// err = nsqd.LoadMetadata()
// test.NotNil(t, err)
// nsqd.Exit()
}

func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewOptions()
Expand Down
2 changes: 1 addition & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

client.PublishedMessage(topicName, uint64(len(messages)))
client.PublishedMessage(topicName, uint64(len(entries)))

return okBytes, nil
}
Expand Down
40 changes: 0 additions & 40 deletions nsqd/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,43 +116,3 @@ func TestClientAttributes(t *testing.T) {
test.Equal(t, userAgent, d.Topics[0].Channels[0].Clients[0].UserAgent)
test.Equal(t, true, d.Topics[0].Channels[0].Clients[0].Snappy)
}

func TestStatsChannelLocking(t *testing.T) {
opts := NewOptions()
opts.Logger = test.NewTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")

var wg sync.WaitGroup

wg.Add(2)
go func() {
for i := 0; i < 25; i++ {
msg := NewMessage(topic.GenerateID(), []byte("test"))
topic.PutMessage(msg)
channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
}
wg.Done()
}()

go func() {
for i := 0; i < 25; i++ {
nsqd.GetStats("", "", true)
}
wg.Done()
}()

wg.Wait()

stats := nsqd.GetStats(topicName, "channel", false)
t.Logf("stats: %+v", stats)

test.Equal(t, 1, len(stats))
test.Equal(t, 1, len(stats[0].Channels))
test.Equal(t, 25, stats[0].Channels[0].InFlightCount)
}
2 changes: 1 addition & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (t *Topic) Pub(entries []wal.EntryWriterTo) error {
atomic.AddUint64(&t.messageCount, uint64(len(entries)))
var total uint64
for _, e := range entries {
total += uint64(len(e.Body))
total += uint64(e.Len())
}
atomic.AddUint64(&t.messageBytes, total)
return nil
Expand Down

0 comments on commit 673072e

Please sign in to comment.