Skip to content

Commit

Permalink
fix: add rwmx
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Evdokimov committed Jan 16, 2025
1 parent 6c15230 commit 48c7102
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newEncoderPool() *encoderPool {
type EncoderMap struct {
m map[rawtopiccommon.Codec]PublicCreateEncoderFunc

mx sync.Mutex
mx sync.RWMutex
p map[rawtopiccommon.Codec]*encoderPool
}

Expand All @@ -59,7 +59,7 @@ func NewEncoderMap() *EncoderMap {
return gzip.NewWriter(writer), nil
},
},
mx: sync.Mutex{},
mx: sync.RWMutex{},
p: make(map[rawtopiccommon.Codec]*encoderPool),
}
}
Expand All @@ -84,19 +84,24 @@ func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data [

resetableEnc, ok := enc.(PublicResetableWriter)
if ok {
e.mx.Lock()
if _, ok := e.p[codec]; !ok {
e.mx.Lock()
e.p[codec] = newEncoderPool()
e.mx.Unlock()
}
e.mx.Unlock()

e.mx.RLock()
e.p[codec].Put(resetableEnc)
e.mx.RUnlock()
}

return n, nil
}

func (e *EncoderMap) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
e.mx.RLock()
defer e.mx.RUnlock()

if ePool, ok := e.p[codec]; ok {
wr := ePool.Get()
if wr != nil {
Expand Down

0 comments on commit 48c7102

Please sign in to comment.