From d2dba29e9545c5af80169e541b1170c3dd34b3b9 Mon Sep 17 00:00:00 2001 From: Dominic Evans <8060970+dnwe@users.noreply.github.com> Date: Thu, 17 Aug 2023 00:07:44 +0100 Subject: [PATCH] feat(gzip): switch to klauspost/compress gzip (#2600) As suggested in #2359, use klauspost/compress for our gzip implementation as we're already using the module for other compression algorithms and benchmarks show 1.5x-7x improvement Signed-off-by: Dominic Evans --- compress.go | 2 +- config.go | 2 +- decompress.go | 2 +- message_test.go | 6 +++--- record_test.go | 55 ++++++------------------------------------------- request_test.go | 3 +++ 6 files changed, 15 insertions(+), 55 deletions(-) diff --git a/compress.go b/compress.go index 504007a49..a7bd525bc 100644 --- a/compress.go +++ b/compress.go @@ -2,11 +2,11 @@ package sarama import ( "bytes" - "compress/gzip" "fmt" "sync" snappy "github.com/eapache/go-xerial-snappy" + "github.com/klauspost/compress/gzip" "github.com/pierrec/lz4/v4" ) diff --git a/config.go b/config.go index 990de93a2..f9299a8c9 100644 --- a/config.go +++ b/config.go @@ -1,7 +1,6 @@ package sarama import ( - "compress/gzip" "crypto/tls" "fmt" "io" @@ -9,6 +8,7 @@ import ( "regexp" "time" + "github.com/klauspost/compress/gzip" "github.com/rcrowley/go-metrics" "golang.org/x/net/proxy" ) diff --git a/decompress.go b/decompress.go index a01cefaa5..0a0998329 100644 --- a/decompress.go +++ b/decompress.go @@ -2,11 +2,11 @@ package sarama import ( "bytes" - "compress/gzip" "fmt" "sync" snappy "github.com/eapache/go-xerial-snappy" + "github.com/klauspost/compress/gzip" "github.com/pierrec/lz4/v4" ) diff --git a/message_test.go b/message_test.go index 82015a386..02d29134b 100644 --- a/message_test.go +++ b/message_test.go @@ -32,15 +32,15 @@ var ( } // value emptyGzipMessage = []byte{ - 132, 99, 80, 148, // CRC + 196, 46, 92, 177, // CRC 0x00, // magic version byte 0x01, // attribute flags 0xFF, 0xFF, 0xFF, 0xFF, // key // value - 0x00, 0x00, 0x00, 0x17, + 0x00, 0x00, 0x00, 0x14, 0x1f, 0x8b, 0x08, - 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 9, 110, 136, 0, 255, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, } emptyLZ4Message = []byte{ diff --git a/record_test.go b/record_test.go index ba87feb4d..8c0f751d7 100644 --- a/record_test.go +++ b/record_test.go @@ -2,9 +2,6 @@ package sarama import ( "reflect" - "runtime" - "strconv" - "strings" "testing" "time" @@ -141,27 +138,10 @@ func recordBatchTestCases() []struct { }, encoded: []byte{ 0, 0, 0, 0, 0, 0, 0, 0, // First Offset - 0, 0, 0, 94, // Length + 0, 0, 0, 95, // Length 0, 0, 0, 0, // Partition Leader Epoch - 2, // Version - 159, 236, 182, 189, // CRC - 0, 1, // Attributes - 0, 0, 0, 0, // Last Offset Delta - 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp - 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp - 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID - 0, 0, // Producer Epoch - 0, 0, 0, 0, // First Sequence - 0, 0, 0, 1, // Number of Records - 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101, - 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0, - }, - oldGoEncoded: []byte{ - 0, 0, 0, 0, 0, 0, 0, 0, // First Offset - 0, 0, 0, 94, // Length - 0, 0, 0, 0, // Partition Leader Epoch - 2, // Version - 0, 216, 14, 210, // CRC + 2, // Version + 231, 74, 206, 165, // CRC 0, 1, // Attributes 0, 0, 0, 0, // Last Offset Delta 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp @@ -170,8 +150,8 @@ func recordBatchTestCases() []struct { 0, 0, // Producer Epoch 0, 0, 0, 0, // First Sequence 0, 0, 0, 1, // Number of Records - 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101, - 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0, + 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 0, 21, 0, 234, 255, 40, 0, 10, 0, 8, 1, 2, + 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12, 3, 0, 173, 201, 88, 103, 21, 0, 0, 0, }, }, { @@ -250,32 +230,9 @@ func recordBatchTestCases() []struct { } } -func isOldGo(t *testing.T) bool { - v := strings.Split(runtime.Version()[2:], ".") - if len(v) < 2 { - t.Logf("Can't parse version: %s", runtime.Version()) - return false - } - maj, err := strconv.Atoi(v[0]) - if err != nil { - t.Logf("Can't parse version: %s", runtime.Version()) - return false - } - min, err := strconv.Atoi(v[1]) - if err != nil { - t.Logf("Can't parse version: %s", runtime.Version()) - return false - } - return maj < 1 || (maj == 1 && min < 8) -} - func TestRecordBatchEncoding(t *testing.T) { for _, tc := range recordBatchTestCases() { - if tc.oldGoEncoded != nil && isOldGo(t) { - testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded) - } else { - testEncodable(t, tc.name, &tc.batch, tc.encoded) - } + testEncodable(t, tc.name, &tc.batch, tc.encoded) } } diff --git a/request_test.go b/request_test.go index a28bda98e..70b09a89a 100644 --- a/request_test.go +++ b/request_test.go @@ -353,6 +353,7 @@ func TestAllocateBodyProtocolVersions(t *testing.T) { // implement the encoder or decoder interfaces that needed somewhere to live func testEncodable(t *testing.T, name string, in encoder, expect []byte) { + t.Helper() packet, err := encode(in, nil) if err != nil { t.Error(err) @@ -362,6 +363,7 @@ func testEncodable(t *testing.T, name string, in encoder, expect []byte) { } func testDecodable(t *testing.T, name string, out decoder, in []byte) { + t.Helper() err := decode(in, out, nil) if err != nil { t.Error("Decoding", name, "failed:", err) @@ -369,6 +371,7 @@ func testDecodable(t *testing.T, name string, out decoder, in []byte) { } func testVersionDecodable(t *testing.T, name string, out versionedDecoder, in []byte, version int16) { + t.Helper() err := versionedDecode(in, out, version, nil) if err != nil { t.Error("Decoding", name, "version", version, "failed:", err)