Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: switch to Klaus Post's flate compression library #1488

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/nsq_to_file/file_logger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"compress/gzip"
"errors"
"fmt"
"io"
@@ -11,6 +10,7 @@ import (
"strings"
"time"

"github.com/klauspost/compress/gzip"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/lg"
)
@@ -331,7 +331,7 @@ func (f *FileLogger) updateFile() {
} else {
openFlag |= os.O_APPEND
}
f.out, err = os.OpenFile(absFilename, openFlag, 0666)
f.out, err = os.OpenFile(absFilename, openFlag, 0o666)
if err != nil {
if os.IsExist(err) {
f.logf(lg.WARN, "[%s/%s] working file already exists: %s", f.topic, f.opts.Channel, absFilename)
@@ -369,7 +369,7 @@ func (f *FileLogger) updateFile() {
func makeDirFromPath(logf lg.AppLogFunc, path string) error {
dir, _ := filepath.Split(path)
if dir != "" {
return os.MkdirAll(dir, 0770)
return os.MkdirAll(dir, 0o770)
}
return nil
}
7 changes: 4 additions & 3 deletions bench/bench_channels/bench_channels.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
var (
num = flag.Int("num", 10000, "num channels")
tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
maxMsgSize = int32(1024 * 1024)
)

func main() {
@@ -56,10 +57,10 @@ func subWorker(n int, tcpAddr string,
<-goChan
nsq.Ready(rdyCount).WriteTo(rw)
rw.Flush()
nsq.ReadResponse(rw)
nsq.ReadResponse(rw)
nsq.ReadResponse(rw, maxMsgSize)
nsq.ReadResponse(rw, maxMsgSize)
for {
resp, err := nsq.ReadResponse(rw)
resp, err := nsq.ReadResponse(rw, maxMsgSize)
if err != nil {
panic(err.Error())
}
7 changes: 4 additions & 3 deletions bench/bench_reader/bench_reader.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ var (
channel = flag.String("channel", "ch", "channel to receive messages on")
deadline = flag.String("deadline", "", "deadline to start the benchmark run")
rdy = flag.Int("rdy", 2500, "RDY count to use")
maxMsgSize = int32(1024 * 1024)
)

var totalMsgCount int64
@@ -86,15 +87,15 @@ func subWorker(td time.Duration, workers int, tcpAddr string, topic string, chan
<-goChan
nsq.Ready(*rdy).WriteTo(rw)
rw.Flush()
nsq.ReadResponse(rw)
nsq.ReadResponse(rw)
nsq.ReadResponse(rw, maxMsgSize)
nsq.ReadResponse(rw, maxMsgSize)
var msgCount int64
go func() {
time.Sleep(td)
conn.Close()
}()
for {
resp, err := nsq.ReadResponse(rw)
resp, err := nsq.ReadResponse(rw, maxMsgSize)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
break
5 changes: 3 additions & 2 deletions bench/bench_writer/bench_writer.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ var (
size = flag.Int("size", 200, "size of messages")
batchSize = flag.Int("batch-size", 200, "batch size of messages")
deadline = flag.String("deadline", "", "deadline to start the benchmark run")
maxMsgSize = int32(1024 * 1024)
)

var totalMsgCount int64
@@ -87,7 +88,7 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte,
rdyChan <- 1
<-goChan
rw.Flush()
nsq.ReadResponse(rw)
nsq.ReadResponse(rw, maxMsgSize)
var msgCount int64
endTime := time.Now().Add(td)
for {
@@ -100,7 +101,7 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte,
if err != nil {
panic(err.Error())
}
resp, err := nsq.ReadResponse(rw)
resp, err := nsq.ReadResponse(rw, maxMsgSize)
if err != nil {
panic(err.Error())
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -11,9 +11,10 @@ require (
github.com/golang/snappy v0.0.4
github.com/judwhite/go-svc v1.2.1
github.com/julienschmidt/httprouter v1.3.0
github.com/klauspost/compress v1.17.8
github.com/mreiferson/go-options v1.0.0
github.com/nsqio/go-diskqueue v1.1.0
github.com/nsqio/go-nsq v1.1.0
github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295
)

require (
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0=
@@ -13,27 +11,26 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/mreiferson/go-options v1.0.0 h1:RMLidydGlDWpL+lQTXo0bVIf/XT2CTq7AEJMoz5/VWs=
github.com/mreiferson/go-options v1.0.0/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
github.com/mreiferson/go-svc v1.2.2-0.20210815184239-7a96e00010f6 h1:NbuBXARvEXrYZ1SzN53ZpObeuwGhl1zvs/C+kzCggrQ=
github.com/mreiferson/go-svc v1.2.2-0.20210815184239-7a96e00010f6/go.mod h1:mo/P2JNX8C07ywpP9YtO2gnBgnUiFTHqtsZekJrUuTk=
github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q=
github.com/nsqio/go-diskqueue v1.1.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295 h1:P5E8d4pd99K4UxkBmqt5y3klvz6hlZcLjyQGLmjwgxM=
github.com/nsqio/go-nsq v1.1.1-0.20230918004844-c2c38427f295/go.mod h1:gi3A+O9Z+6PsytlChJFv3ofbCEpnwmwdYNJkcDM1cxM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 h1:SeSEfdIxyvwGJliREIJhRPPXvW6sDlLT+UQ3B0hD0NA=
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
5 changes: 3 additions & 2 deletions internal/http_api/compress.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,12 @@
package http_api

import (
"compress/flate"
"compress/gzip"
"io"
"net/http"
"strings"

"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/gzip"
)

type compressResponseWriter struct {
10 changes: 10 additions & 0 deletions internal/test/logger.go
Original file line number Diff line number Diff line change
@@ -20,3 +20,13 @@ func (tl *testLogger) Output(maxdepth int, s string) error {
func NewTestLogger(tbl tbLog) Logger {
return &testLogger{tbl}
}

// NilLogger is a Logger that produces no output. It may be useful in
// benchmarks, where otherwise the output would be distracting. The benchmarking
// package restricts output to 10 lines anyway, so logging via the testing.B is
// not very useful.
type NilLogger struct{}

func (l NilLogger) Output(maxdepth int, s string) error {
return nil
}
2 changes: 1 addition & 1 deletion nsqd/client_v2.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package nsqd

import (
"bufio"
"compress/flate"
"crypto/tls"
"fmt"
"net"
@@ -12,6 +11,7 @@ import (
"time"

"github.com/golang/snappy"
"github.com/klauspost/compress/flate"
"github.com/nsqio/nsq/internal/auth"
)

28 changes: 19 additions & 9 deletions nsqd/guid_test.go
Original file line number Diff line number Diff line change
@@ -23,20 +23,30 @@ func BenchmarkGUIDUnsafe(b *testing.B) {
}

func BenchmarkGUID(b *testing.B) {
var okays, errors, fails int64
var previd guid
factory := &guidFactory{}
factory := NewGUIDFactory(37)
for i := 0; i < b.N; i++ {
id, err := factory.NewGUID()
if err != nil {
errors++
} else if id == previd {
fails++
b.Fail()
} else {
okays++
b.Fatal(err)
} else if id <= previd {
b.Fatal("repeated or descending id")
}
previd = id
id.Hex()
}
b.Logf("okays=%d errors=%d bads=%d", okays, errors, fails)
}

func TestGUID(t *testing.T) {
factory := NewGUIDFactory(1)
var previd guid
for i := 0; i < 1000; i++ {
id, err := factory.NewGUID()
if err != nil {
t.Fatal(err)
} else if id <= previd {
t.Fatal("repeated or descending id")
}
previd = id
}
}
14 changes: 9 additions & 5 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
@@ -519,9 +519,8 @@ func TestHTTPClientStats(t *testing.T) {
var d struct {
Topics []struct {
Channels []struct {
ClientCount int `json:"client_count"`
Clients []struct {
} `json:"clients"`
ClientCount int `json:"client_count"`
Clients []struct{} `json:"clients"`
} `json:"channels"`
} `json:"topics"`
Memory *struct{} `json:"memory,omitempty"`
@@ -899,14 +898,19 @@ func BenchmarkHTTPpub(b *testing.B) {
var wg sync.WaitGroup
b.StopTimer()
opts := NewOptions()
opts.Logger = test.NewTestLogger(b)
opts.Logger = test.NilLogger{}
opts.MemQueueSize = int64(b.N)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
msg := make([]byte, 256)
topicName := "bench_http_pub" + strconv.Itoa(int(time.Now().Unix()))
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
client := &http.Client{}
client := &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 1000,
MaxIdleConns: 0,
},
}
b.SetBytes(int64(len(msg)))
b.StartTimer()

235 changes: 166 additions & 69 deletions nsqd/protocol_v2_test.go

Large diffs are not rendered by default.

151 changes: 83 additions & 68 deletions nsqd/protocol_v2_unixsocket_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
@@ -199,7 +199,7 @@ func BenchmarkTopicPut(b *testing.B) {
b.StopTimer()
topicName := "bench_topic_put" + strconv.Itoa(b.N)
opts := NewOptions()
opts.Logger = test.NewTestLogger(b)
opts.Logger = test.NilLogger{}
opts.MemQueueSize = int64(b.N)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
@@ -218,7 +218,7 @@ func BenchmarkTopicToChannelPut(b *testing.B) {
topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N)
channelName := "bench"
opts := NewOptions()
opts.Logger = test.NewTestLogger(b)
opts.Logger = test.NilLogger{}
opts.MemQueueSize = int64(b.N)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
20 changes: 10 additions & 10 deletions nsqlookupd/nsqlookupd_test.go
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ func identify(t *testing.T, conn net.Conn) {
cmd, _ := nsq.Identify(ci)
_, err := cmd.WriteTo(conn)
test.Nil(t, err)
_, err = nsq.ReadResponse(conn)
_, err = nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)
}

@@ -89,7 +89,7 @@ func TestBasicLookupd(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "channel1").WriteTo(conn)
v, err := nsq.ReadResponse(conn)
v, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)
test.Equal(t, []byte("OK"), v)

@@ -164,7 +164,7 @@ func TestChannelUnregister(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "ch1").WriteTo(conn)
v, err := nsq.ReadResponse(conn)
v, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)
test.Equal(t, []byte("OK"), v)

@@ -175,7 +175,7 @@ func TestChannelUnregister(t *testing.T) {
test.Equal(t, 1, len(channels))

nsq.UnRegister(topicName, "ch1").WriteTo(conn)
v, err = nsq.ReadResponse(conn)
v, err = nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)
test.Equal(t, []byte("OK"), v)

@@ -211,11 +211,11 @@ func TestTombstoneRecover(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "channel1").WriteTo(conn)
_, err := nsq.ReadResponse(conn)
_, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

nsq.Register(topicName2, "channel2").WriteTo(conn)
_, err = nsq.ReadResponse(conn)
_, err = nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d",
@@ -258,7 +258,7 @@ func TestTombstoneUnregister(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "channel1").WriteTo(conn)
_, err := nsq.ReadResponse(conn)
_, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

endpoint := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s:%d",
@@ -274,7 +274,7 @@ func TestTombstoneUnregister(t *testing.T) {
test.Equal(t, 0, len(pr.Producers))

nsq.UnRegister(topicName, "").WriteTo(conn)
_, err = nsq.ReadResponse(conn)
_, err = nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

time.Sleep(55 * time.Millisecond)
@@ -302,7 +302,7 @@ func TestInactiveNodes(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "channel1").WriteTo(conn)
_, err := nsq.ReadResponse(conn)
_, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout))
@@ -335,7 +335,7 @@ func TestTombstonedNodes(t *testing.T) {
identify(t, conn)

nsq.Register(topicName, "channel1").WriteTo(conn)
_, err := nsq.ReadResponse(conn)
_, err := nsq.ReadResponse(conn, maxMsgSize)
test.Nil(t, err)

ci := clusterinfo.New(nil, http_api.NewClient(nil, ConnectTimeout, RequestTimeout))