Skip to content

Commit

Permalink
fix: skip splited buf nodes when nocopy read
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Apr 22, 2024
1 parent 7c443cf commit 40e02ba
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 35 deletions.
14 changes: 8 additions & 6 deletions nocopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,14 @@ const (
pagesize = block8k
mallocMax = block8k * block1k // mallocMax is 8MB

minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes
defaultLinkBufferMode = reusableMask // default buffer mode is reusable but not readonly
// reusable mode indicate to whether reuse buffer node data, default value is true
reusableMask uint8 = 1 << 0 // 0000 0001
// readonly mode enable by Refer/WriteString/WriteBinary/etc. API, default value is false
readonlyMask uint8 = 1 << 1 // 0000 0010
minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes

defaultLinkBufferMode = 0
// readonly mode indicate that the buffer node memory is not controlled by itself,
// so we cannot reuse the buffer or nocopy read it, default value is false.
readonlyMask uint8 = 1 << 0 // 0000 0001
// nocopyRead mode indicate that the buffer node has been no copy read and cannot reuse the buffer, default value is false.
nocopyReadMask uint8 = 1 << 1 // 0000 0010
)

// zero-copy slice convert to string
Expand Down
24 changes: 14 additions & 10 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,18 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) {

// single node
if b.isSingleNode(n) {
// if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// no need to malloc 10 times and the string slice could have the compact memory allocation.
if !b.read.getMode(reusableMask) {
return b.read.Next(n)
}
if n >= minReuseBytes && cap(b.read.buf) <= block32k {
b.read.setMode(reusableMask, false)
return b.read.Next(n)
// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
if !b.read.getMode(readonlyMask) {
// if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// no need to malloc 10 times and the string slice could have the compact memory allocation.
if b.read.getMode(nocopyReadMask) {
return b.read.Next(n)
}
if n >= minReuseBytes && cap(b.read.buf) <= block32k {
b.read.setMode(nocopyReadMask, true)
return b.read.Next(n)
}
}
// if the underlying buffer too large, we shouldn't use no-copy mode
p = make([]byte, n)
Expand Down Expand Up @@ -496,6 +499,7 @@ func (b *UnsafeLinkBuffer) WriteDirect(extra []byte, remainLen int) error {
dataNode.buf, dataNode.malloc = extra[:0], n

if remainLen > 0 {
// split a single buffer node to originNode and newNode
newNode := newLinkBufferNode(0)
newNode.off = malloc
newNode.buf = origin.buf[:malloc]
Expand Down Expand Up @@ -836,5 +840,5 @@ func (node *linkBufferNode) setMode(mask uint8, enable bool) {
}

func (node *linkBufferNode) reusable() bool {
return node.mode&reusableMask == 1 && node.mode&readonlyMask == 0
return node.mode&(nocopyReadMask|readonlyMask) == 0
}
95 changes: 76 additions & 19 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package netpoll
import (
"bytes"
"fmt"
"runtime"
"sync/atomic"
"testing"
)
Expand Down Expand Up @@ -492,48 +493,104 @@ func TestWriteDirect(t *testing.T) {
}

func TestNoCopyWriteAndRead(t *testing.T) {
// [512bytes] + [512bytes] + [1bytes]
// [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
const (
mallocLen = 4096 * 2
originLen = 4096
dataLen = 512
newLen = 16
normalLen = 4096
)
buf := NewLinkBuffer()
userBuf := make([]byte, 512)
bt, _ := buf.Malloc(mallocLen)
originBuf := bt[:originLen]
newBuf := bt[originLen : originLen+newLen]

// write origin_node
for i := 0; i < originLen; i++ {
bt[i] = 'a'
}
// write data_node
userBuf := make([]byte, dataLen)
for i := 0; i < len(userBuf); i++ {
userBuf[i] = 'b'
}
bt, _ := buf.Malloc(1024)
for i := 0; i < 512; i++ {
bt[i] = 'a'
buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// write new_node
for i := 0; i < newLen; i++ {
bt[originLen+i] = 'c'
}
buf.MallocAck(originLen + dataLen + newLen)
buf.Flush()
// write normal_node
normalBuf, _ := buf.Malloc(normalLen)
for i := 0; i < normalLen; i++ {
normalBuf[i] = 'd'
}
buf.WriteDirect(userBuf, 512) // nocopy write
bt[512] = 'c'
buf.MallocAck(1025)
buf.Flush()
Equal(t, buf.Len(), 1025)
Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)

bt, _ = buf.ReadBinary(512) // nocopy read
// copy read origin_node
bt, _ = buf.ReadBinary(originLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'a')
}
MustTrue(t, !buf.read.reusable() && buf.read.getMode(readonlyMask)) // next read node must be read-only
bt, _ = buf.ReadBinary(512) // nocopy read userBuf
MustTrue(t, &bt[0] != &originBuf[0])
// next read node is data node and must be readonly and non-reusable
MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// copy read data_node
bt, _ = buf.ReadBinary(dataLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'b')
}
MustTrue(t, &bt[0] == &userBuf[0])
MustTrue(t, &bt[0] != &userBuf[0])
// copy read new_node
bt, _ = buf.ReadBinary(newLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'c')
}
MustTrue(t, &bt[0] != &newBuf[0])
// current read node is the new node and must not be reusable
newnode := buf.read
t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
MustTrue(t, newnode.reusable())
var nodeReleased int32
runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
// nocopy read normal_node
bt, _ = buf.ReadBinary(normalLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'd')
}
MustTrue(t, &bt[0] == &normalBuf[0])
// normal buffer never should be released
runtime.SetFinalizer(&bt[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
_ = buf.Release()
MustTrue(t, newnode.buf == nil)
for atomic.LoadInt32(&nodeReleased) == 0 {
runtime.GC()
t.Log("newnode release check failed")
}
Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
runtime.KeepAlive(normalBuf)
}

func TestBufferMode(t *testing.T) {
bufnode := newLinkBufferNode(0)
MustTrue(t, bufnode.getMode(reusableMask))
MustTrue(t, !bufnode.getMode(nocopyReadMask))
MustTrue(t, bufnode.getMode(readonlyMask))
MustTrue(t, !bufnode.reusable())

bufnode = newLinkBufferNode(1)
MustTrue(t, bufnode.getMode(reusableMask))
MustTrue(t, !bufnode.getMode(nocopyReadMask))
MustTrue(t, !bufnode.getMode(readonlyMask))
bufnode.setMode(reusableMask, false)
MustTrue(t, !bufnode.getMode(reusableMask))
bufnode.setMode(reusableMask, true)
MustTrue(t, bufnode.getMode(reusableMask))
bufnode.setMode(nocopyReadMask, false)
MustTrue(t, !bufnode.getMode(nocopyReadMask))
bufnode.setMode(nocopyReadMask, true)
MustTrue(t, bufnode.getMode(nocopyReadMask))
}

func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) {
Expand Down

0 comments on commit 40e02ba

Please sign in to comment.