From 40e02ba706150d496d55b0a3db3860fa97d1be27 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Mon, 22 Apr 2024 16:03:02 +0800 Subject: [PATCH] fix: skip splited buf nodes when nocopy read --- nocopy.go | 14 +++--- nocopy_linkbuffer.go | 24 +++++----- nocopy_linkbuffer_test.go | 95 +++++++++++++++++++++++++++++++-------- 3 files changed, 98 insertions(+), 35 deletions(-) diff --git a/nocopy.go b/nocopy.go index f4f75192..6df02e8f 100644 --- a/nocopy.go +++ b/nocopy.go @@ -257,12 +257,14 @@ const ( pagesize = block8k mallocMax = block8k * block1k // mallocMax is 8MB - minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes - defaultLinkBufferMode = reusableMask // default buffer mode is reusable but not readonly - // reusable mode indicate to whether reuse buffer node data, default value is true - reusableMask uint8 = 1 << 0 // 0000 0001 - // readonly mode enable by Refer/WriteString/WriteBinary/etc. API, default value is false - readonlyMask uint8 = 1 << 1 // 0000 0010 + minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes + + defaultLinkBufferMode = 0 + // readonly mode indicate that the buffer node memory is not controlled by itself, + // so we cannot reuse the buffer or nocopy read it, default value is false. + readonlyMask uint8 = 1 << 0 // 0000 0001 + // nocopyRead mode indicate that the buffer node has been no copy read and cannot reuse the buffer, default value is false. + nocopyReadMask uint8 = 1 << 1 // 0000 0010 ) // zero-copy slice convert to string diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 5e038a51..cb48cad1 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -218,15 +218,18 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { // single node if b.isSingleNode(n) { - // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently - // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec - // no need to malloc 10 times and the string slice could have the compact memory allocation. - if !b.read.getMode(reusableMask) { - return b.read.Next(n) - } - if n >= minReuseBytes && cap(b.read.buf) <= block32k { - b.read.setMode(reusableMask, false) - return b.read.Next(n) + // we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself + if !b.read.getMode(readonlyMask) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(nocopyReadMask) { + return b.read.Next(n) + } + if n >= minReuseBytes && cap(b.read.buf) <= block32k { + b.read.setMode(nocopyReadMask, true) + return b.read.Next(n) + } } // if the underlying buffer too large, we shouldn't use no-copy mode p = make([]byte, n) @@ -496,6 +499,7 @@ func (b *UnsafeLinkBuffer) WriteDirect(extra []byte, remainLen int) error { dataNode.buf, dataNode.malloc = extra[:0], n if remainLen > 0 { + // split a single buffer node to originNode and newNode newNode := newLinkBufferNode(0) newNode.off = malloc newNode.buf = origin.buf[:malloc] @@ -836,5 +840,5 @@ func (node *linkBufferNode) setMode(mask uint8, enable bool) { } func (node *linkBufferNode) reusable() bool { - return node.mode&reusableMask == 1 && node.mode&readonlyMask == 0 + return node.mode&(nocopyReadMask|readonlyMask) == 0 } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 194dc502..25770762 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -20,6 +20,7 @@ package netpoll import ( "bytes" "fmt" + "runtime" "sync/atomic" "testing" ) @@ -492,48 +493,104 @@ func TestWriteDirect(t *testing.T) { } func TestNoCopyWriteAndRead(t *testing.T) { - // [512bytes] + [512bytes] + [1bytes] + // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] + const ( + mallocLen = 4096 * 2 + originLen = 4096 + dataLen = 512 + newLen = 16 + normalLen = 4096 + ) buf := NewLinkBuffer() - userBuf := make([]byte, 512) + bt, _ := buf.Malloc(mallocLen) + originBuf := bt[:originLen] + newBuf := bt[originLen : originLen+newLen] + + // write origin_node + for i := 0; i < originLen; i++ { + bt[i] = 'a' + } + // write data_node + userBuf := make([]byte, dataLen) for i := 0; i < len(userBuf); i++ { userBuf[i] = 'b' } - bt, _ := buf.Malloc(1024) - for i := 0; i < 512; i++ { - bt[i] = 'a' + buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write + // write new_node + for i := 0; i < newLen; i++ { + bt[originLen+i] = 'c' + } + buf.MallocAck(originLen + dataLen + newLen) + buf.Flush() + // write normal_node + normalBuf, _ := buf.Malloc(normalLen) + for i := 0; i < normalLen; i++ { + normalBuf[i] = 'd' } - buf.WriteDirect(userBuf, 512) // nocopy write - bt[512] = 'c' - buf.MallocAck(1025) buf.Flush() - Equal(t, buf.Len(), 1025) + Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) - bt, _ = buf.ReadBinary(512) // nocopy read + // copy read origin_node + bt, _ = buf.ReadBinary(originLen) for i := 0; i < len(bt); i++ { MustTrue(t, bt[i] == 'a') } - MustTrue(t, !buf.read.reusable() && buf.read.getMode(readonlyMask)) // next read node must be read-only - bt, _ = buf.ReadBinary(512) // nocopy read userBuf + MustTrue(t, &bt[0] != &originBuf[0]) + // next read node is data node and must be readonly and non-reusable + MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) + // copy read data_node + bt, _ = buf.ReadBinary(dataLen) for i := 0; i < len(bt); i++ { MustTrue(t, bt[i] == 'b') } - MustTrue(t, &bt[0] == &userBuf[0]) + MustTrue(t, &bt[0] != &userBuf[0]) + // copy read new_node + bt, _ = buf.ReadBinary(newLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'c') + } + MustTrue(t, &bt[0] != &newBuf[0]) + // current read node is the new node and must not be reusable + newnode := buf.read + t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) + MustTrue(t, newnode.reusable()) + var nodeReleased int32 + runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + // nocopy read normal_node + bt, _ = buf.ReadBinary(normalLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'd') + } + MustTrue(t, &bt[0] == &normalBuf[0]) + // normal buffer never should be released + runtime.SetFinalizer(&bt[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) _ = buf.Release() + MustTrue(t, newnode.buf == nil) + for atomic.LoadInt32(&nodeReleased) == 0 { + runtime.GC() + t.Log("newnode release check failed") + } + Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) + runtime.KeepAlive(normalBuf) } func TestBufferMode(t *testing.T) { bufnode := newLinkBufferNode(0) - MustTrue(t, bufnode.getMode(reusableMask)) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) MustTrue(t, bufnode.getMode(readonlyMask)) MustTrue(t, !bufnode.reusable()) bufnode = newLinkBufferNode(1) - MustTrue(t, bufnode.getMode(reusableMask)) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) MustTrue(t, !bufnode.getMode(readonlyMask)) - bufnode.setMode(reusableMask, false) - MustTrue(t, !bufnode.getMode(reusableMask)) - bufnode.setMode(reusableMask, true) - MustTrue(t, bufnode.getMode(reusableMask)) + bufnode.setMode(nocopyReadMask, false) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + bufnode.setMode(nocopyReadMask, true) + MustTrue(t, bufnode.getMode(nocopyReadMask)) } func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) {