Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool for encoders #1584

Merged
merged 12 commits into from
Jan 20, 2025
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Supported pool of encoders, which implement ResetableWriter interface

## v3.97.0
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation

Expand Down
110 changes: 95 additions & 15 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
Expand All @@ -17,36 +18,115 @@ const (
codecUnknown = rawtopiccommon.CodecUNSPECIFIED
)

type EncoderMap struct {
type PublicResetableWriter interface {
io.WriteCloser
Reset(wr io.Writer)
}

type encoderPool struct {
pool sync.Pool
}

func (p *encoderPool) Get() PublicResetableWriter {
enc, _ := p.pool.Get().(PublicResetableWriter)

return enc
}

func (p *encoderPool) Put(wr PublicResetableWriter) {
p.pool.Put(wr)
}

func newEncoderPool() *encoderPool {
return &encoderPool{
pool: sync.Pool{},
}
}

type MultiEncoder struct {
m map[rawtopiccommon.Codec]PublicCreateEncoderFunc

bp xsync.Pool[bytes.Buffer]
ep map[rawtopiccommon.Codec]*encoderPool
}

func NewEncoderMap() *EncoderMap {
return &EncoderMap{
m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{
rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
},
rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
func NewMultiEncoder() *MultiEncoder {
me := &MultiEncoder{
m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc),
bp: xsync.Pool[bytes.Buffer]{
New: func() *bytes.Buffer {
return &bytes.Buffer{}
},
},
ep: make(map[rawtopiccommon.Codec]*encoderPool),
}

me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
})
me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})

return me
}

func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
e.m[codec] = creator
e.ep[codec] = newEncoderPool()
}

func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) {
buf := e.bp.GetOrNew()
defer e.bp.Put(buf)

buf.Reset()
if _, err := buf.ReadFrom(source); err != nil {
return 0, err
}

return e.EncodeBytes(codec, target, buf.Bytes())
}

func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) {
enc, err := e.createEncodeWriter(codec, target)
if err != nil {
return 0, err
}

n, err := enc.Write(data)
if err == nil {
err = enc.Close()
}
if err != nil {
return 0, err
}

if resetableEnc, ok := enc.(PublicResetableWriter); ok {
e.ep[codec].Put(resetableEnc)
}

return n, nil
}

func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
if ePool, ok := e.ep[codec]; ok {
wr := ePool.Get()
if wr != nil {
wr.Reset(target)

return wr, nil
}
}

if encoderCreator, ok := e.m[codec]; ok {
return encoderCreator(target)
}

return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec)))
}

func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m))
for codec := range e.m {
res = append(res, codec)
Expand All @@ -55,7 +135,7 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
return res
}

func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool {
func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool {
_, ok := e.m[codec]

return ok
Expand All @@ -73,7 +153,7 @@ func (nopWriteCloser) Close() error {

// EncoderSelector not thread safe
type EncoderSelector struct {
m *EncoderMap
m *MultiEncoder

tracer *trace.Topic
writerReconnectorID string
Expand All @@ -87,7 +167,7 @@ type EncoderSelector struct {
}

func NewEncoderSelector(
m *EncoderMap,
m *MultiEncoder,
allowedCodecs rawtopiccommon.SupportedCodecs,
parallelCompressors int,
tracer *trace.Topic,
Expand Down
91 changes: 90 additions & 1 deletion internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
"testing"

Expand All @@ -20,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
})
t.Run("One", func(t *testing.T) {
s := NewEncoderSelector(
NewEncoderMap(),
NewMultiEncoder(),
rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw},
1,
&trace.Topic{},
Expand Down Expand Up @@ -206,3 +209,89 @@ func TestCompressMessages(t *testing.T) {
require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount))
})
}

func TestMultiEncoder(t *testing.T) {
decompressGzip := func(rd io.Reader) string {
gzreader, err := gzip.NewReader(rd)
require.NoError(t, err)

decompressedMsg, err := io.ReadAll(gzreader)
require.NoError(t, err)

return string(decompressedMsg)
}

t.Run("BuffersPool", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg))
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}
})

t.Run("NotResetableWriter", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()
require.Len(t, testMultiEncoder.ep, 2)

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data"))
require.NoError(t, err)
require.Equal(t, "test_data", buf.String())
})

t.Run("ResetableWriterCustom", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

customCodecCode := rawtopiccommon.Codec(10001)
testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})
require.Len(t, testMultiEncoder.ep, 3)

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriter", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriterManyMessages", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg)
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}
})
}
30 changes: 6 additions & 24 deletions internal/topic/topicwriterinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type messageWithDataContent struct {
bufCodec rawtopiccommon.Codec
bufEncoded bytes.Buffer
rawBuf bytes.Buffer
encoders *EncoderMap
encoders *MultiEncoder
BufUncompressedSize int
}

Expand Down Expand Up @@ -111,18 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([

m.bufEncoded.Reset()

writer, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed create encoder for message, codec '%v': %w",
codec,
err,
)))
}
_, err = writer.Write(m.rawBuf.Bytes())
if err == nil {
err = writer.Close()
}
_, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes())
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed to compress message, codec '%v': %w",
Expand Down Expand Up @@ -157,27 +146,20 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code
m.bufCodec = codec
m.bufEncoded.Reset()

encoder, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return err
}

reader := m.Data
if reader == nil {
reader = &bytes.Reader{}
}
bytesCount, err := io.Copy(encoder, reader)
if err == nil {
err = encoder.Close()
}

bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed compress message with codec '%v': %w",
codec,
err,
)))
}
m.BufUncompressedSize = int(bytesCount)
m.BufUncompressedSize = bytesCount
m.Data = nil

return nil
Expand Down Expand Up @@ -218,7 +200,7 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([]

func newMessageDataWithContent(
message PublicMessage, //nolint:gocritic
encoders *EncoderMap,
encoders *MultiEncoder,
) messageWithDataContent {
return messageWithDataContent{
PublicMessage: message,
Expand Down
10 changes: 5 additions & 5 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type WriterReconnector struct {
semaphore *semaphore.Weighted
firstInitResponseProcessedChan empty.Chan
lastSeqNo int64
encodersMap *EncoderMap
encodersMap *MultiEncoder
initDoneCh empty.Chan
initInfo InitialInfo
m xsync.RWMutex
Expand Down Expand Up @@ -156,7 +156,7 @@ func newWriterReconnectorStopped(
queue: newMessageQueue(),
lastSeqNo: -1,
firstInitResponseProcessedChan: make(empty.Chan),
encodersMap: NewEncoderMap(),
encodersMap: NewMultiEncoder(),
writerInstanceID: writerInstanceID.String(),
retrySettings: cfg.RetrySettings,
}
Expand Down Expand Up @@ -760,11 +760,11 @@ func createRawMessageData(
return res, err
}

func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *EncoderMap,
func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder,
serverCodecs rawtopiccommon.SupportedCodecs,
) rawtopiccommon.SupportedCodecs {
if forceCodec != rawtopiccommon.CodecUNSPECIFIED {
if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) {
if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) {
return rawtopiccommon.SupportedCodecs{forceCodec}
}

Expand All @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *Encoder

res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs))
for _, codec := range serverCodecs {
if encoderMap.IsSupported(codec) {
if multiEncoder.IsSupported(codec) {
res = append(res, codec)
}
}
Expand Down
Loading
Loading