Skip to content

Commit

Permalink
fix: never reuse buffer node when reset tail node add cap restraint
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jun 21, 2024
1 parent 2c16b97 commit 84e6a85
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
42 changes: 42 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,48 @@ func TestConnectionRead(t *testing.T) {
rconn.Close()
}

func TestConnectionNoCopyReadString(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)

var size, cycleTime = 256, 100
var readBucket = make([]string, cycleTime)
var trigger = make(chan struct{})
go func() {
for i := 0; i < cycleTime; i++ {
buf, err := rconn.Reader().ReadString(size)
MustNil(t, err)
Equal(t, len(buf), size)
rconn.Release()
readBucket[i] = buf
trigger <- struct{}{}
}
}()
var msg = make([]byte, size)
for i := 0; i < cycleTime; i++ {
byt := 'a' + byte(i%26)
for c := 0; c < size; c++ {
msg[c] = byt
}
n, err := wconn.Write(msg)
MustNil(t, err)
Equal(t, n, len(msg))
<-trigger
}

for i := 0; i < cycleTime; i++ {
byt := 'a' + byte(i%26)
for _, c := range readBucket[i] {
Equal(t, byte(c), byt)
}
}

wconn.Close()
rconn.Close()
}

func TestConnectionReadAfterClosed(t *testing.T) {
r, w := GetSysFdPairs()
var rconn = &connection{}
Expand Down
26 changes: 8 additions & 18 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,12 +674,9 @@ func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) {
// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *UnsafeLinkBuffer) resetTail(maxSize int) {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if maxSize <= pagesize {
b.write.Reset()
return
}

// set nil tail
b.write.next = newLinkBufferNode(0)
b.write = b.write.next
Expand Down Expand Up @@ -748,6 +745,7 @@ func (b *UnsafeLinkBuffer) growth(n int) {
}

// isSingleNode determines whether reading needs to cross nodes.
// isSingleNode will move b.read to latest non-empty node if there is a zero-size node
// Must require b.Len() > 0
func (b *UnsafeLinkBuffer) isSingleNode(readN int) (single bool) {
if readN <= 0 {
Expand Down Expand Up @@ -818,29 +816,20 @@ func (node *linkBufferNode) IsEmpty() (ok bool) {
return node.off == len(node.buf)
}

func (node *linkBufferNode) Reset() {
if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 {
return
}
node.off, node.malloc = 0, 0
node.buf = node.buf[:0]
return
}

func (node *linkBufferNode) Next(n int) (p []byte) {
off := node.off
node.off += n
return node.buf[off:node.off]
return node.buf[off:node.off:node.off]
}

func (node *linkBufferNode) Peek(n int) (p []byte) {
return node.buf[node.off : node.off+n]
return node.buf[node.off : node.off+n : node.off+n]
}

func (node *linkBufferNode) Malloc(n int) (buf []byte) {
malloc := node.malloc
node.malloc += n
return node.buf[malloc:node.malloc]
return node.buf[malloc:node.malloc:node.malloc]
}

// Refer holds a reference count at the same time as Next, and releases the real buffer after Release.
Expand Down Expand Up @@ -878,17 +867,18 @@ func (node *linkBufferNode) Release() (err error) {
}

func (node *linkBufferNode) getMode(mask uint8) bool {
return node.mode&mask > 0
return (node.mode & mask) > 0
}

func (node *linkBufferNode) setMode(mask uint8, enable bool) {
if enable {
node.mode = node.mode | mask
} else {
node.mode = node.mode & ^mask
node.mode = node.mode &^ mask
}
}

// only non-readonly and copied-read node should be reusable
func (node *linkBufferNode) reusable() bool {
return node.mode&(nocopyReadMask|readonlyMask) == 0
return node.mode&(readonlyMask|nocopyReadMask) == 0
}
4 changes: 3 additions & 1 deletion nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
MustTrue(t, newnode.buf == nil)
for atomic.LoadInt32(&nodeReleased) == 0 {
runtime.GC()
t.Log("newnode release check failed")
t.Log("newnode release checking")
}
Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
runtime.KeepAlive(normalBuf)
Expand All @@ -620,8 +620,10 @@ func TestLinkBufferBufferMode(t *testing.T) {
MustTrue(t, !bufnode.getMode(readonlyMask))
bufnode.setMode(nocopyReadMask, false)
MustTrue(t, !bufnode.getMode(nocopyReadMask))
MustTrue(t, bufnode.reusable())
bufnode.setMode(nocopyReadMask, true)
MustTrue(t, bufnode.getMode(nocopyReadMask))
MustTrue(t, !bufnode.reusable())
}

func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) {
Expand Down

0 comments on commit 84e6a85

Please sign in to comment.