diff --git a/nocopy.go b/nocopy.go index 80df5f9b..6df02e8f 100644 --- a/nocopy.go +++ b/nocopy.go @@ -16,6 +16,10 @@ package netpoll import ( "io" + "reflect" + "unsafe" + + "github.com/bytedance/gopkg/lang/mcache" ) // Reader is a collection of operations for nocopy reads. @@ -108,9 +112,9 @@ type Reader interface { // The usage of the design is a two-step operation, first apply for a section of memory, // fill it and then submit. E.g: // -// var buf, _ = Malloc(n) -// buf = append(buf[:0], ...) -// Flush() +// var buf, _ = Malloc(n) +// buf = append(buf[:0], ...) +// Flush() // // Note that it is not recommended to submit self-managed buffers to Writer. // Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission, @@ -244,10 +248,52 @@ func NewIOReadWriter(rw ReadWriter) io.ReadWriter { } const ( - block1k = 1 * 1024 - block2k = 2 * 1024 - block4k = 4 * 1024 - block8k = 8 * 1024 + block1k = 1 * 1024 + block2k = 2 * 1024 + block4k = 4 * 1024 + block8k = 8 * 1024 + block32k = 32 * 1024 + + pagesize = block8k + mallocMax = block8k * block1k // mallocMax is 8MB + + minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes + + defaultLinkBufferMode = 0 + // readonly mode indicate that the buffer node memory is not controlled by itself, + // so we cannot reuse the buffer or nocopy read it, default value is false. + readonlyMask uint8 = 1 << 0 // 0000 0001 + // nocopyRead mode indicate that the buffer node has been no copy read and cannot reuse the buffer, default value is false. + nocopyReadMask uint8 = 1 << 1 // 0000 0010 ) -const pagesize = block8k +// zero-copy slice convert to string +func unsafeSliceToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +// zero-copy slice convert to string +func unsafeStringToSlice(s string) (b []byte) { + p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + hdr.Data = uintptr(p) + hdr.Cap = len(s) + hdr.Len = len(s) + return b +} + +// malloc limits the cap of the buffer from mcache. +func malloc(size, capacity int) []byte { + if capacity > mallocMax { + return make([]byte, size, capacity) + } + return mcache.Malloc(size, capacity) +} + +// free limits the cap of the buffer from mcache. +func free(buf []byte) { + if cap(buf) > mallocMax { + return + } + mcache.Free(buf) +} diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index c888c3c8..cb48cad1 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,21 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !race -// +build !race - package netpoll import ( "bytes" "errors" "fmt" - "reflect" "sync" "sync/atomic" - "unsafe" - - "github.com/bytedance/gopkg/lang/mcache" ) // BinaryInplaceThreshold marks the minimum value of the nocopy slice length, @@ -36,6 +29,9 @@ const BinaryInplaceThreshold = block4k // LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. var LinkBufferCap = block4k +var _ Reader = &LinkBuffer{} +var _ Writer = &LinkBuffer{} + // NewLinkBuffer size defines the initial capacity, but there is no readable data. func NewLinkBuffer(size ...int) *LinkBuffer { var buf = &LinkBuffer{} @@ -48,8 +44,8 @@ func NewLinkBuffer(size ...int) *LinkBuffer { return buf } -// LinkBuffer implements ReadWriter. -type LinkBuffer struct { +// UnsafeLinkBuffer implements ReadWriter. +type UnsafeLinkBuffer struct { length int64 mallocSize int @@ -61,24 +57,21 @@ type LinkBuffer struct { caches [][]byte // buf allocated by Next when cross-package, which should be freed when release } -var _ Reader = &LinkBuffer{} -var _ Writer = &LinkBuffer{} - // Len implements Reader. -func (b *LinkBuffer) Len() int { +func (b *UnsafeLinkBuffer) Len() int { l := atomic.LoadInt64(&b.length) return int(l) } // IsEmpty check if this LinkBuffer is empty. -func (b *LinkBuffer) IsEmpty() (ok bool) { +func (b *UnsafeLinkBuffer) IsEmpty() (ok bool) { return b.Len() == 0 } // ------------------------------------------ implement zero-copy reader ------------------------------------------ // Next implements Reader. -func (b *LinkBuffer) Next(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) Next(n int) (p []byte, err error) { if n <= 0 { return } @@ -117,7 +110,7 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { // Peek does not have an independent lifecycle, and there is no signal to // indicate that Peek content can be released, so Peek will not introduce mcache for now. -func (b *LinkBuffer) Peek(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { if n <= 0 { return } @@ -154,7 +147,7 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) { } // Skip implements Reader. -func (b *LinkBuffer) Skip(n int) (err error) { +func (b *UnsafeLinkBuffer) Skip(n int) (err error) { if n <= 0 { return } @@ -178,7 +171,7 @@ func (b *LinkBuffer) Skip(n int) (err error) { // Release the node that has been read. // b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice -func (b *LinkBuffer) Release() (err error) { +func (b *UnsafeLinkBuffer) Release() (err error) { for b.read != b.flush && b.read.Len() == 0 { b.read = b.read.next } @@ -196,7 +189,7 @@ func (b *LinkBuffer) Release() (err error) { } // ReadString implements Reader. -func (b *LinkBuffer) ReadString(n int) (s string, err error) { +func (b *UnsafeLinkBuffer) ReadString(n int) (s string, err error) { if n <= 0 { return } @@ -208,7 +201,7 @@ func (b *LinkBuffer) ReadString(n int) (s string, err error) { } // ReadBinary implements Reader. -func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) ReadBinary(n int) (p []byte, err error) { if n <= 0 { return } @@ -220,15 +213,30 @@ func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { } // readBinary cannot use mcache, because the memory allocated by readBinary will not be recycled. -func (b *LinkBuffer) readBinary(n int) (p []byte) { +func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { b.recalLen(-n) // re-cal length // single node - p = make([]byte, n) if b.isSingleNode(n) { + // we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself + if !b.read.getMode(readonlyMask) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(nocopyReadMask) { + return b.read.Next(n) + } + if n >= minReuseBytes && cap(b.read.buf) <= block32k { + b.read.setMode(nocopyReadMask, true) + return b.read.Next(n) + } + } + // if the underlying buffer too large, we shouldn't use no-copy mode + p = make([]byte, n) copy(p, b.read.Next(n)) return p } + p = make([]byte, n) // multiple nodes var pIdx int var l int @@ -247,7 +255,7 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) { } // ReadByte implements Reader. -func (b *LinkBuffer) ReadByte() (p byte, err error) { +func (b *UnsafeLinkBuffer) ReadByte() (p byte, err error) { // check whether enough or not. if b.Len() < 1 { return p, errors.New("link buffer read byte is empty") @@ -262,7 +270,7 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) { } // Until returns a slice ends with the delim in the buffer. -func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { +func (b *UnsafeLinkBuffer) Until(delim byte) (line []byte, err error) { n := b.indexByte(delim, 0) if n < 0 { return nil, fmt.Errorf("link buffer read slice cannot find: '%b'", delim) @@ -274,7 +282,7 @@ func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { // and only holds the ability of Reader. // // Slice will automatically execute a Release. -func (b *LinkBuffer) Slice(n int) (r Reader, err error) { +func (b *UnsafeLinkBuffer) Slice(n int) (r Reader, err error) { if n <= 0 { return NewLinkBuffer(0), nil } @@ -285,9 +293,9 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { b.recalLen(-n) // re-cal length // just use for range - p := &LinkBuffer{ - length: int64(n), - } + p := new(LinkBuffer) + p.length = int64(n) + defer func() { // set to read-only p.flush = p.flush.next @@ -324,7 +332,7 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { // ------------------------------------------ implement zero-copy writer ------------------------------------------ // Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush). -func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { +func (b *UnsafeLinkBuffer) Malloc(n int) (buf []byte, err error) { if n <= 0 { return } @@ -334,12 +342,12 @@ func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { } // MallocLen implements Writer. -func (b *LinkBuffer) MallocLen() (length int) { +func (b *UnsafeLinkBuffer) MallocLen() (length int) { return b.mallocSize } // MallocAck will keep the first n malloc bytes and discard the rest. -func (b *LinkBuffer) MallocAck(n int) (err error) { +func (b *UnsafeLinkBuffer) MallocAck(n int) (err error) { if n < 0 { return fmt.Errorf("link buffer malloc ack[%d] invalid", n) } @@ -363,7 +371,7 @@ func (b *LinkBuffer) MallocAck(n int) (err error) { } // Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned. -func (b *LinkBuffer) Flush() (err error) { +func (b *UnsafeLinkBuffer) Flush() (err error) { b.mallocSize = 0 // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. if cap(b.write.buf) > pagesize { @@ -385,7 +393,7 @@ func (b *LinkBuffer) Flush() (err error) { } // Append implements Writer. -func (b *LinkBuffer) Append(w Writer) (err error) { +func (b *UnsafeLinkBuffer) Append(w Writer) (err error) { var buf, ok = w.(*LinkBuffer) if !ok { return errors.New("unsupported writer which is not LinkBuffer") @@ -396,7 +404,7 @@ func (b *LinkBuffer) Append(w Writer) (err error) { // WriteBuffer will not submit(e.g. Flush) data to ensure normal use of MallocLen. // you must actively submit before read the data. // The argument buf can't be used after calling WriteBuffer. (set it to nil) -func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { +func (b *UnsafeLinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { if buf == nil { return } @@ -435,7 +443,7 @@ func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { } // WriteString implements Writer. -func (b *LinkBuffer) WriteString(s string) (n int, err error) { +func (b *UnsafeLinkBuffer) WriteString(s string) (n int, err error) { if len(s) == 0 { return } @@ -444,7 +452,7 @@ func (b *LinkBuffer) WriteString(s string) (n int, err error) { } // WriteBinary implements Writer. -func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { +func (b *UnsafeLinkBuffer) WriteBinary(p []byte) (n int, err error) { n = len(p) if n == 0 { return @@ -466,8 +474,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. -func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { - n := len(p) +func (b *UnsafeLinkBuffer) WriteDirect(extra []byte, remainLen int) error { + n := len(extra) if n == 0 || remainLen < 0 { return nil } @@ -479,20 +487,26 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { origin = origin.next } // Add the buf length of the original node + // `malloc` is the origin buffer offset that already malloced, the extra buffer should be inserted after that offset. malloc += len(origin.buf) // Create dataNode and newNode and insert them into the chain - dataNode := newLinkBufferNode(0) - dataNode.buf, dataNode.malloc = p[:0], n + // dataNode wrap the user buffer extra, and newNode wrap the origin left netpoll buffer + // - originNode{buf=origin, off=0, malloc=malloc, readonly=true} : non-reusable + // - dataNode{buf=extra, off=0, malloc=len(extra), readonly=true} : non-reusable + // - newNode{buf=origin, off=malloc, malloc=origin.malloc, readonly=false} : reusable + dataNode := newLinkBufferNode(0) // zero node will be set by readonly mode + dataNode.buf, dataNode.malloc = extra[:0], n if remainLen > 0 { + // split a single buffer node to originNode and newNode newNode := newLinkBufferNode(0) newNode.off = malloc newNode.buf = origin.buf[:malloc] newNode.malloc = origin.malloc - newNode.readonly = false + newNode.setMode(readonlyMask, false) origin.malloc = malloc - origin.readonly = true + origin.setMode(readonlyMask, true) // link nodes dataNode.next = newNode @@ -514,7 +528,7 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { } // WriteByte implements Writer. -func (b *LinkBuffer) WriteByte(p byte) (err error) { +func (b *UnsafeLinkBuffer) WriteByte(p byte) (err error) { dst, err := b.Malloc(1) if len(dst) == 1 { dst[0] = p @@ -523,7 +537,7 @@ func (b *LinkBuffer) WriteByte(p byte) (err error) { } // Close will recycle all buffer. -func (b *LinkBuffer) Close() (err error) { +func (b *UnsafeLinkBuffer) Close() (err error) { atomic.StoreInt64(&b.length, 0) b.mallocSize = 0 // just release all @@ -540,7 +554,7 @@ func (b *LinkBuffer) Close() (err error) { // ------------------------------------------ implement connection interface ------------------------------------------ // Bytes returns all the readable bytes of this LinkBuffer. -func (b *LinkBuffer) Bytes() []byte { +func (b *UnsafeLinkBuffer) Bytes() []byte { node, flush := b.read, b.flush if node == flush { return node.buf[node.off:] @@ -558,7 +572,7 @@ func (b *LinkBuffer) Bytes() []byte { // GetBytes will read and fill the slice p as much as possible. // If p is not passed, return all readable bytes. -func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { +func (b *UnsafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { node, flush := b.read, b.flush if len(p) == 0 { n := 0 @@ -588,7 +602,7 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // maxSize: The maximum size of data between two Release(). In some cases, this can // // guarantee all data allocated in one node to reduce copy. -func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { +func (b *UnsafeLinkBuffer) book(bookSize, maxSize int) (p []byte) { l := cap(b.write.buf) - b.write.malloc // grow linkBuffer if l == 0 { @@ -605,7 +619,7 @@ func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { // bookAck will ack the first n malloc bytes and discard the rest. // // length: The size of data in inputBuffer. It is used to calculate the maxSize -func (b *LinkBuffer) bookAck(n int) (length int, err error) { +func (b *UnsafeLinkBuffer) bookAck(n int) (length int, err error) { b.write.malloc = n + len(b.write.buf) b.write.buf = b.write.buf[:b.write.malloc] b.flush = b.write @@ -616,7 +630,7 @@ func (b *LinkBuffer) bookAck(n int) (length int, err error) { } // calcMaxSize will calculate the data size between two Release() -func (b *LinkBuffer) calcMaxSize() (sum int) { +func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) { for node := b.head; node != b.read; node = node.next { sum += len(node.buf) } @@ -624,8 +638,24 @@ func (b *LinkBuffer) calcMaxSize() (sum int) { return sum } +// resetTail will reset tail node or add an empty tail node to +// guarantee the tail node is not larger than 8KB +func (b *UnsafeLinkBuffer) resetTail(maxSize int) { + // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. + if maxSize <= pagesize { + b.write.Reset() + return + } + + // set nil tail + b.write.next = newLinkBufferNode(0) + b.write = b.write.next + b.flush = b.write + return +} + // indexByte returns the index of the first instance of c in buffer, or -1 if c is not present in buffer. -func (b *LinkBuffer) indexByte(c byte, skip int) int { +func (b *UnsafeLinkBuffer) indexByte(c byte, skip int) int { size := b.Len() if skip >= size { return -1 @@ -656,25 +686,41 @@ func (b *LinkBuffer) indexByte(c byte, skip int) int { return -1 } -// resetTail will reset tail node or add an empty tail node to -// guarantee the tail node is not larger than 8KB -func (b *LinkBuffer) resetTail(maxSize int) { - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if maxSize <= pagesize { - b.write.Reset() +// ------------------------------------------ private function ------------------------------------------ + +// recalLen re-calculate the length +func (b *UnsafeLinkBuffer) recalLen(delta int) (length int) { + return int(atomic.AddInt64(&b.length, int64(delta))) +} + +// growth directly create the next node, when b.write is not enough. +func (b *UnsafeLinkBuffer) growth(n int) { + if n <= 0 { return } - - // set nil tail - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.flush = b.write - return + // the memory of readonly node if not malloc by us so should skip them + for b.write.getMode(readonlyMask) || cap(b.write.buf)-b.write.malloc < n { + if b.write.next == nil { + b.write.next = newLinkBufferNode(n) + b.write = b.write.next + return + } + b.write = b.write.next + } } -// recalLen re-calculate the length -func (b *LinkBuffer) recalLen(delta int) (length int) { - return int(atomic.AddInt64(&b.length, int64(delta))) +// isSingleNode determines whether reading needs to cross nodes. +// Must require b.Len() > 0 +func (b *UnsafeLinkBuffer) isSingleNode(readN int) (single bool) { + if readN <= 0 { + return true + } + l := b.read.Len() + for l == 0 && b.read != b.flush { + b.read = b.read.next + l = b.read.Len() + } + return l >= readN } // ------------------------------------------ implement link node ------------------------------------------ @@ -684,9 +730,9 @@ func (b *LinkBuffer) recalLen(delta int) (length int) { func newLinkBufferNode(size int) *linkBufferNode { var node = linkedPool.Get().(*linkBufferNode) // reset node offset - node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false + node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode if size <= 0 { - node.readonly = true + node.setMode(readonlyMask, true) return node } if size < LinkBufferCap { @@ -705,13 +751,13 @@ var linkedPool = sync.Pool{ } type linkBufferNode struct { - buf []byte // buffer - off int // read-offset - malloc int // write-offset - refer int32 // reference count - readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false - origin *linkBufferNode // the root node of the extends - next *linkBufferNode // the next node of the linked buffer + buf []byte // buffer + off int // read-offset + malloc int // write-offset + refer int32 // reference count + mode uint8 // mode store all bool bit status + origin *linkBufferNode // the root node of the extends + next *linkBufferNode // the next node of the linked buffer } func (node *linkBufferNode) Len() (l int) { @@ -772,7 +818,7 @@ func (node *linkBufferNode) Release() (err error) { // release self if atomic.AddInt32(&node.refer, -1) == 0 { // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. - if !node.readonly { + if node.reusable() { free(node.buf) } node.buf, node.origin, node.next = nil, nil, nil @@ -781,68 +827,18 @@ func (node *linkBufferNode) Release() (err error) { return nil } -// ------------------------------------------ private function ------------------------------------------ - -// growth directly create the next node, when b.write is not enough. -func (b *LinkBuffer) growth(n int) { - if n <= 0 { - return - } - // Must skip read-only node. - for b.write.readonly || cap(b.write.buf)-b.write.malloc < n { - if b.write.next == nil { - b.write.next = newLinkBufferNode(n) - b.write = b.write.next - return - } - b.write = b.write.next - } +func (node *linkBufferNode) getMode(mask uint8) bool { + return node.mode&mask > 0 } -// isSingleNode determines whether reading needs to cross nodes. -// Must require b.Len() > 0 -func (b *LinkBuffer) isSingleNode(readN int) (single bool) { - if readN <= 0 { - return true - } - l := b.read.Len() - for l == 0 && b.read != b.flush { - b.read = b.read.next - l = b.read.Len() - } - return l >= readN -} - -// zero-copy slice convert to string -func unsafeSliceToString(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} - -// zero-copy slice convert to string -func unsafeStringToSlice(s string) (b []byte) { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b -} - -// mallocMax is 8MB -const mallocMax = block8k * block1k - -// malloc limits the cap of the buffer from mcache. -func malloc(size, capacity int) []byte { - if capacity > mallocMax { - return make([]byte, size, capacity) +func (node *linkBufferNode) setMode(mask uint8, enable bool) { + if enable { + node.mode = node.mode | mask + } else { + node.mode = node.mode & ^mask } - return mcache.Malloc(size, capacity) } -// free limits the cap of the buffer from mcache. -func free(buf []byte) { - if cap(buf) > mallocMax { - return - } - mcache.Free(buf) +func (node *linkBufferNode) reusable() bool { + return node.mode&(nocopyReadMask|readonlyMask) == 0 } diff --git a/nocopy_linkbuffer_norace.go b/nocopy_linkbuffer_norace.go new file mode 100644 index 00000000..8a07382f --- /dev/null +++ b/nocopy_linkbuffer_norace.go @@ -0,0 +1,20 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race +// +build !race + +package netpoll + +type LinkBuffer = UnsafeLinkBuffer diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 4166222e..642b01c6 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -18,612 +18,175 @@ package netpoll import ( - "bytes" - "errors" - "fmt" - "reflect" "sync" - "sync/atomic" - "unsafe" - - "github.com/bytedance/gopkg/lang/mcache" ) -// BinaryInplaceThreshold marks the minimum value of the nocopy slice length, -// which is the threshold to use copy to minimize overhead. -const BinaryInplaceThreshold = block4k - -// LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. -var LinkBufferCap = block4k - -// NewLinkBuffer size defines the initial capacity, but there is no readable data. -func NewLinkBuffer(size ...int) *LinkBuffer { - var buf = &LinkBuffer{} - var l int - if len(size) > 0 { - l = size[0] - } - var node = newLinkBufferNode(l) - buf.head, buf.read, buf.flush, buf.write = node, node, node, node - return buf -} +type LinkBuffer = SafeLinkBuffer -// LinkBuffer implements ReadWriter. -type LinkBuffer struct { +// SafeLinkBuffer only used to in go tests with -race +type SafeLinkBuffer struct { sync.Mutex - length int32 - mallocSize int - - head *linkBufferNode // release head - read *linkBufferNode // read head - flush *linkBufferNode // malloc head - write *linkBufferNode // malloc tail - - caches [][]byte // buf allocated by Next when cross-package, which should be freed when release -} - -var _ Reader = &LinkBuffer{} -var _ Writer = &LinkBuffer{} - -// Len implements Reader. -func (b *LinkBuffer) Len() int { - l := atomic.LoadInt32(&b.length) - return int(l) -} - -// IsEmpty check if this LinkBuffer is empty. -func (b *LinkBuffer) IsEmpty() (ok bool) { - return b.Len() == 0 + UnsafeLinkBuffer } // ------------------------------------------ implement zero-copy reader ------------------------------------------ // Next implements Reader. -func (b *LinkBuffer) Next(n int) (p []byte, err error) { +func (b *SafeLinkBuffer) Next(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer next[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - // single node - if b.isSingleNode(n) { - return b.read.Next(n), nil - } - // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = make([]byte, n) - } - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - pIdx += copy(p[pIdx:], b.read.Next(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], b.read.Next(l)) - } - b.read = b.read.next - } - _ = pIdx - return p, nil + return b.UnsafeLinkBuffer.Next(n) } -// Peek does not have an independent lifecycle, and there is no signal to -// indicate that Peek content can be released, so Peek will not introduce mcache for now. -func (b *LinkBuffer) Peek(n int) (p []byte, err error) { +// Peek implements Reader. +func (b *SafeLinkBuffer) Peek(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer peek[%d] not enough", n) - } - // single node - if b.isSingleNode(n) { - return b.read.Peek(n), nil - } - // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = make([]byte, n) - } - var node = b.read - var l int - for ack := n; ack > 0; ack = ack - l { - l = node.Len() - if l >= ack { - pIdx += copy(p[pIdx:], node.Peek(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], node.Peek(l)) - } - node = node.next - } - _ = pIdx - return p, nil + return b.UnsafeLinkBuffer.Peek(n) } // Skip implements Reader. -func (b *LinkBuffer) Skip(n int) (err error) { +func (b *SafeLinkBuffer) Skip(n int) (err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return fmt.Errorf("link buffer skip[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - b.read.off += ack - break - } - b.read = b.read.next - } - return nil + return b.UnsafeLinkBuffer.Skip(n) } -// Until returns a slice ends with the delim in the buffer. -func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { +// Until implements Reader. +func (b *SafeLinkBuffer) Until(delim byte) (line []byte, err error) { b.Lock() defer b.Unlock() - n := b.indexByte(delim, 0) - if n < 0 { - return nil, fmt.Errorf("link buffer cannot find delim: '%b'", delim) - } - return b.Next(n + 1) + return b.UnsafeLinkBuffer.Until(delim) } -// Release the node that has been read. -// b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice -func (b *LinkBuffer) Release() (err error) { +// Release implements Reader. +func (b *SafeLinkBuffer) Release() (err error) { b.Lock() defer b.Unlock() - return b.release() -} - -func (b *LinkBuffer) release() (err error) { - for b.read != b.flush && b.read.Len() == 0 { - b.read = b.read.next - } - for b.head != b.read { - node := b.head - b.head = b.head.next - node.Release() - } - for i := range b.caches { - free(b.caches[i]) - b.caches[i] = nil - } - b.caches = b.caches[:0] - return nil + return b.UnsafeLinkBuffer.Release() } // ReadString implements Reader. -func (b *LinkBuffer) ReadString(n int) (s string, err error) { +func (b *SafeLinkBuffer) ReadString(n int) (s string, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return s, fmt.Errorf("link buffer read string[%d] not enough", n) - } - return unsafeSliceToString(b.readBinary(n)), nil + return b.UnsafeLinkBuffer.ReadString(n) } // ReadBinary implements Reader. -func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { +func (b *SafeLinkBuffer) ReadBinary(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer read binary[%d] not enough", n) - } - return b.readBinary(n), nil -} - -// readBinary cannot use mcache, because the memory allocated by readBinary will not be recycled. -func (b *LinkBuffer) readBinary(n int) (p []byte) { - b.recalLen(-n) // re-cal length - - // single node - p = make([]byte, n) - if b.isSingleNode(n) { - copy(p, b.read.Next(n)) - return p - } - // multiple nodes - var pIdx int - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - pIdx += copy(p[pIdx:], b.read.Next(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], b.read.Next(l)) - } - b.read = b.read.next - } - _ = pIdx - return p + return b.UnsafeLinkBuffer.ReadBinary(n) } // ReadByte implements Reader. -func (b *LinkBuffer) ReadByte() (p byte, err error) { +func (b *SafeLinkBuffer) ReadByte() (p byte, err error) { b.Lock() defer b.Unlock() - // check whether enough or not. - if b.Len() < 1 { - return p, errors.New("link buffer read byte is empty") - } - b.recalLen(-1) // re-cal length - for { - if b.read.Len() >= 1 { - return b.read.Next(1)[0], nil - } - b.read = b.read.next - } + return b.UnsafeLinkBuffer.ReadByte() } -// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer, -// and only holds the ability of Reader. -// -// Slice will automatically execute a Release. -func (b *LinkBuffer) Slice(n int) (r Reader, err error) { +// Slice implements Reader. +func (b *SafeLinkBuffer) Slice(n int) (r Reader, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return NewLinkBuffer(0), nil - } - // check whether enough or not. - if b.Len() < n { - return r, fmt.Errorf("link buffer readv[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - // just use for range - p := &LinkBuffer{ - length: int32(n), - } - defer func() { - // set to read-only - p.flush = p.flush.next - p.write = p.flush - }() - - // single node - if b.isSingleNode(n) { - node := b.read.Refer(n) - p.head, p.read, p.flush = node, node, node - return p, nil - } - // multiple nodes - var l = b.read.Len() - node := b.read.Refer(l) - b.read = b.read.next - - p.head, p.read, p.flush = node, node, node - for ack := n - l; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - p.flush.next = b.read.Refer(ack) - p.flush = p.flush.next - break - } else if l > 0 { - p.flush.next = b.read.Refer(l) - p.flush = p.flush.next - } - b.read = b.read.next - } - return p, b.release() + return b.UnsafeLinkBuffer.Slice(n) } // ------------------------------------------ implement zero-copy writer ------------------------------------------ -// Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush). -func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { +// Malloc implements Writer. +func (b *SafeLinkBuffer) Malloc(n int) (buf []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - b.mallocSize += n - b.growth(n) - return b.write.Malloc(n), nil + return b.UnsafeLinkBuffer.Malloc(n) } // MallocLen implements Writer. -func (b *LinkBuffer) MallocLen() (length int) { +func (b *SafeLinkBuffer) MallocLen() (length int) { b.Lock() defer b.Unlock() - length = b.mallocSize - return length + return b.UnsafeLinkBuffer.MallocLen() } -// MallocAck will keep the first n malloc bytes and discard the rest. -func (b *LinkBuffer) MallocAck(n int) (err error) { +// MallocAck implements Writer. +func (b *SafeLinkBuffer) MallocAck(n int) (err error) { b.Lock() defer b.Unlock() - if n < 0 { - return fmt.Errorf("link buffer malloc ack[%d] invalid", n) - } - b.mallocSize = n - b.write = b.flush - - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.write.malloc - len(b.write.buf) - if l >= ack { - b.write.malloc = ack + len(b.write.buf) - break - } - b.write = b.write.next - } - // discard the rest - for node := b.write.next; node != nil; node = node.next { - node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0] - } - return nil + return b.UnsafeLinkBuffer.MallocAck(n) } -// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned. -func (b *LinkBuffer) Flush() (err error) { +// Flush implements Writer. +func (b *SafeLinkBuffer) Flush() (err error) { b.Lock() defer b.Unlock() - b.mallocSize = 0 - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if cap(b.write.buf) > pagesize { - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - } - var n int - for node := b.flush; node != b.write.next; node = node.next { - delta := node.malloc - len(node.buf) - if delta > 0 { - n += delta - node.buf = node.buf[:node.malloc] - } - } - b.flush = b.write - // re-cal length - b.recalLen(n) - return nil + return b.UnsafeLinkBuffer.Flush() } // Append implements Writer. -func (b *LinkBuffer) Append(w Writer) (err error) { - var buf, ok = w.(*LinkBuffer) - if !ok { - return errors.New("unsupported writer which is not LinkBuffer") - } - return b.WriteBuffer(buf) +func (b *SafeLinkBuffer) Append(w Writer) (err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.Append(w) } -// WriteBuffer will not submit(e.g. Flush) data to ensure normal use of MallocLen. -// you must actively submit before read the data. -// The argument buf can't be used after calling WriteBuffer. (set it to nil) -func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { +// WriteBuffer implements Writer. +func (b *SafeLinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { b.Lock() defer b.Unlock() - if buf == nil { - return - } - bufLen, bufMallocLen := buf.Len(), buf.MallocLen() - if bufLen+bufMallocLen <= 0 { - return nil - } - b.write.next = buf.read - b.write = buf.write - - // close buf, prevents reuse. - for buf.head != buf.read { - nd := buf.head - buf.head = buf.head.next - nd.Release() - } - for buf.write = buf.write.next; buf.write != nil; { - nd := buf.write - buf.write = buf.write.next - nd.Release() - } - buf.length, buf.mallocSize, buf.head, buf.read, buf.flush, buf.write = 0, 0, nil, nil, nil, nil - - // DON'T MODIFY THE CODE BELOW UNLESS YOU KNOW WHAT YOU ARE DOING ! - // - // You may encounter a chain of bugs and not be able to - // find out within a week that they are caused by modifications here. - // - // After release buf, continue to adjust b. - b.write.next = nil - if bufLen > 0 { - b.recalLen(bufLen) - } - b.mallocSize += bufMallocLen - return nil + return b.UnsafeLinkBuffer.WriteBuffer(buf) } // WriteString implements Writer. -func (b *LinkBuffer) WriteString(s string) (n int, err error) { - if len(s) == 0 { - return - } - buf := unsafeStringToSlice(s) - return b.WriteBinary(buf) +func (b *SafeLinkBuffer) WriteString(s string) (n int, err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.WriteString(s) } // WriteBinary implements Writer. -func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { +func (b *SafeLinkBuffer) WriteBinary(p []byte) (n int, err error) { b.Lock() defer b.Unlock() - n = len(p) - if n == 0 { - return - } - b.mallocSize += n - - // TODO: Verify that all nocopy is possible under mcache. - if n > BinaryInplaceThreshold { - // expand buffer directly with nocopy - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.write.buf, b.write.malloc = p[:0], n - return n, nil - } - // here will copy - b.growth(n) - buf := b.write.Malloc(n) - return copy(buf, p), nil + return b.UnsafeLinkBuffer.WriteBinary(p) } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. -func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { +func (b *SafeLinkBuffer) WriteDirect(p []byte, remainLen int) error { b.Lock() defer b.Unlock() - n := len(p) - if n == 0 || remainLen < 0 { - return nil - } - // find origin - origin := b.flush - malloc := b.mallocSize - remainLen // calculate the remaining malloc length - for t := origin.malloc - len(origin.buf); t < malloc; t = origin.malloc - len(origin.buf) { - malloc -= t - origin = origin.next - } - // Add the buf length of the original node - malloc += len(origin.buf) - - // Create dataNode and newNode and insert them into the chain - dataNode := newLinkBufferNode(0) - dataNode.buf, dataNode.malloc = p[:0], n - - if remainLen > 0 { - newNode := newLinkBufferNode(0) - newNode.off = malloc - newNode.buf = origin.buf[:malloc] - newNode.malloc = origin.malloc - newNode.readonly = false - origin.malloc = malloc - origin.readonly = true - - // link nodes - dataNode.next = newNode - newNode.next = origin.next - origin.next = dataNode - } else { - // link nodes - dataNode.next = origin.next - origin.next = dataNode - } - - // adjust b.write - for b.write.next != nil { - b.write = b.write.next - } - - b.mallocSize += n - return nil + return b.UnsafeLinkBuffer.WriteDirect(p, remainLen) } // WriteByte implements Writer. -func (b *LinkBuffer) WriteByte(p byte) (err error) { - dst, err := b.Malloc(1) - if len(dst) == 1 { - dst[0] = p - } - return err +func (b *SafeLinkBuffer) WriteByte(p byte) (err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.WriteByte(p) } // Close will recycle all buffer. -func (b *LinkBuffer) Close() (err error) { +func (b *SafeLinkBuffer) Close() (err error) { b.Lock() defer b.Unlock() - atomic.StoreInt32(&b.length, 0) - b.mallocSize = 0 - // just release all - b.release() - for node := b.head; node != nil; { - nd := node - node = node.next - nd.Release() - } - b.head, b.read, b.flush, b.write = nil, nil, nil, nil - return nil + return b.UnsafeLinkBuffer.Close() } // ------------------------------------------ implement connection interface ------------------------------------------ -// Bytes returns all the readable bytes of this LinkBuffer. -func (b *LinkBuffer) Bytes() []byte { +// Bytes returns all the readable bytes of this SafeLinkBuffer. +func (b *SafeLinkBuffer) Bytes() []byte { b.Lock() defer b.Unlock() - node, flush := b.read, b.flush - if node == flush { - return node.buf[node.off:] - } - n := 0 - p := make([]byte, b.Len()) - for ; node != flush; node = node.next { - if node.Len() > 0 { - n += copy(p[n:], node.buf[node.off:]) - } - } - n += copy(p[n:], flush.buf[flush.off:]) - return p[:n] + return b.UnsafeLinkBuffer.Bytes() } // GetBytes will read and fill the slice p as much as possible. -// If p is not passed, return all readable bytes. -func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { +func (b *SafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { b.Lock() defer b.Unlock() - node, flush := b.read, b.flush - if len(p) == 0 { - n := 0 - for ; node != flush; node = node.next { - n++ - } - node = b.read - p = make([][]byte, n) - } - var i int - for i = 0; node != flush && i < len(p); node = node.next { - if node.Len() > 0 { - p[i] = node.buf[node.off:] - i++ - } - } - if i < len(p) { - p[i] = flush.buf[flush.off:] - i++ - } - return p[:i] + return b.UnsafeLinkBuffer.GetBytes(p) } // book will grow and malloc buffer to hold data. @@ -632,266 +195,36 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // maxSize: The maximum size of data between two Release(). In some cases, this can // // guarantee all data allocated in one node to reduce copy. -func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { +func (b *SafeLinkBuffer) book(bookSize, maxSize int) (p []byte) { b.Lock() defer b.Unlock() - l := cap(b.write.buf) - b.write.malloc - // grow linkBuffer - if l == 0 { - l = maxSize - b.write.next = newLinkBufferNode(maxSize) - b.write = b.write.next - } - if l > bookSize { - l = bookSize - } - return b.write.Malloc(l) + return b.UnsafeLinkBuffer.book(bookSize, maxSize) } // bookAck will ack the first n malloc bytes and discard the rest. // // length: The size of data in inputBuffer. It is used to calculate the maxSize -func (b *LinkBuffer) bookAck(n int) (length int, err error) { +func (b *SafeLinkBuffer) bookAck(n int) (length int, err error) { b.Lock() defer b.Unlock() - b.write.malloc = n + len(b.write.buf) - b.write.buf = b.write.buf[:b.write.malloc] - b.flush = b.write - - // re-cal length - length = b.recalLen(n) - return length, nil + return b.UnsafeLinkBuffer.bookAck(n) } // calcMaxSize will calculate the data size between two Release() -func (b *LinkBuffer) calcMaxSize() (sum int) { - for node := b.head; node != b.read; node = node.next { - sum += len(node.buf) - } - sum += len(b.read.buf) - return sum -} - -func (b *LinkBuffer) indexByte(c byte, skip int) int { +func (b *SafeLinkBuffer) calcMaxSize() (sum int) { b.Lock() defer b.Unlock() - size := b.Len() - if skip >= size { - return -1 - } - var unread, n, l int - node := b.read - for unread = size; unread > 0; unread -= n { - l = node.Len() - if l >= unread { // last node - n = unread - } else { // read full node - n = l - } - - // skip current node - if skip >= n { - skip -= n - node = node.next - continue - } - i := bytes.IndexByte(node.Peek(n)[skip:], c) - if i >= 0 { - return (size - unread) + skip + i // past_read + skip_read + index - } - skip = 0 // no skip bytes - node = node.next - } - return -1 -} - -// resetTail will reset tail node or add an empty tail node to -// guarantee the tail node is not larger than 8KB -func (b *LinkBuffer) resetTail(maxSize int) { - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if maxSize <= pagesize { - b.write.Reset() - return - } - - // set nil tail - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.flush = b.write - return -} - -// recalLen re-calculate the length -func (b *LinkBuffer) recalLen(delta int) (length int) { - return int(atomic.AddInt32(&b.length, int32(delta))) -} - -// ------------------------------------------ implement link node ------------------------------------------ - -// newLinkBufferNode create or reuse linkBufferNode. -// Nodes with size <= 0 are marked as readonly, which means the node.buf is not allocated by this mcache. -func newLinkBufferNode(size int) *linkBufferNode { - var node = linkedPool.Get().(*linkBufferNode) - // reset node offset - node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false - if size <= 0 { - node.readonly = true - return node - } - if size < LinkBufferCap { - size = LinkBufferCap - } - node.buf = malloc(0, size) - return node -} - -var linkedPool = sync.Pool{ - New: func() interface{} { - return &linkBufferNode{ - refer: 1, // 自带 1 引用 - } - }, + return b.UnsafeLinkBuffer.calcMaxSize() } -type linkBufferNode struct { - buf []byte // buffer - off int // read-offset - malloc int // write-offset - refer int32 // reference count - readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false - origin *linkBufferNode // the root node of the extends - next *linkBufferNode // the next node of the linked buffer -} - -func (node *linkBufferNode) Len() (l int) { - return len(node.buf) - node.off -} - -func (node *linkBufferNode) IsEmpty() (ok bool) { - return node.off == len(node.buf) -} - -func (node *linkBufferNode) Reset() { - if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 { - return - } - node.off, node.malloc = 0, 0 - node.buf = node.buf[:0] - return -} - -func (node *linkBufferNode) Next(n int) (p []byte) { - off := node.off - node.off += n - return node.buf[off:node.off] -} - -func (node *linkBufferNode) Peek(n int) (p []byte) { - return node.buf[node.off : node.off+n] -} - -func (node *linkBufferNode) Malloc(n int) (buf []byte) { - malloc := node.malloc - node.malloc += n - return node.buf[malloc:node.malloc] -} - -// Refer holds a reference count at the same time as Next, and releases the real buffer after Release. -// The node obtained by Refer is read-only. -func (node *linkBufferNode) Refer(n int) (p *linkBufferNode) { - p = newLinkBufferNode(0) - p.buf = node.Next(n) - - if node.origin != nil { - p.origin = node.origin - } else { - p.origin = node - } - atomic.AddInt32(&p.origin.refer, 1) - return p -} - -// Release consists of two parts: -// 1. reduce the reference count of itself and origin. -// 2. recycle the buf when the reference count is 0. -func (node *linkBufferNode) Release() (err error) { - if node.origin != nil { - node.origin.Release() - } - // release self - if atomic.AddInt32(&node.refer, -1) == 0 { - // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. - if !node.readonly { - free(node.buf) - } - node.buf, node.origin, node.next = nil, nil, nil - linkedPool.Put(node) - } - return nil -} - -// ------------------------------------------ private function ------------------------------------------ - -// growth directly create the next node, when b.write is not enough. -func (b *LinkBuffer) growth(n int) { - if n <= 0 { - return - } - // Must skip read-only node. - for b.write.readonly || cap(b.write.buf)-b.write.malloc < n { - if b.write.next == nil { - b.write.next = newLinkBufferNode(n) - b.write = b.write.next - return - } - b.write = b.write.next - } -} - -// isSingleNode determines whether reading needs to cross nodes. -// Must require b.Len() > 0 -func (b *LinkBuffer) isSingleNode(readN int) (single bool) { - if readN <= 0 { - return true - } - l := b.read.Len() - for l == 0 && b.read != b.flush { - b.read = b.read.next - l = b.read.Len() - } - return l >= readN -} - -// zero-copy slice convert to string -func unsafeSliceToString(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} - -// zero-copy slice convert to string -func unsafeStringToSlice(s string) (b []byte) { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b -} - -// mallocMax is 8MB -const mallocMax = block8k * block1k - -// malloc limits the cap of the buffer from mcache. -func malloc(size, capacity int) []byte { - if capacity > mallocMax { - return make([]byte, size, capacity) - } - return mcache.Malloc(size, capacity) +func (b *SafeLinkBuffer) resetTail(maxSize int) { + b.Lock() + defer b.Unlock() + b.UnsafeLinkBuffer.resetTail(maxSize) } -// free limits the cap of the buffer from mcache. -func free(buf []byte) { - if cap(buf) > mallocMax { - return - } - mcache.Free(buf) +func (b *SafeLinkBuffer) indexByte(c byte, skip int) int { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.indexByte(c, skip) } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index a376b7bf..25770762 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -20,6 +20,7 @@ package netpoll import ( "bytes" "fmt" + "runtime" "sync/atomic" "testing" ) @@ -491,6 +492,107 @@ func TestWriteDirect(t *testing.T) { } } +func TestNoCopyWriteAndRead(t *testing.T) { + // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] + const ( + mallocLen = 4096 * 2 + originLen = 4096 + dataLen = 512 + newLen = 16 + normalLen = 4096 + ) + buf := NewLinkBuffer() + bt, _ := buf.Malloc(mallocLen) + originBuf := bt[:originLen] + newBuf := bt[originLen : originLen+newLen] + + // write origin_node + for i := 0; i < originLen; i++ { + bt[i] = 'a' + } + // write data_node + userBuf := make([]byte, dataLen) + for i := 0; i < len(userBuf); i++ { + userBuf[i] = 'b' + } + buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write + // write new_node + for i := 0; i < newLen; i++ { + bt[originLen+i] = 'c' + } + buf.MallocAck(originLen + dataLen + newLen) + buf.Flush() + // write normal_node + normalBuf, _ := buf.Malloc(normalLen) + for i := 0; i < normalLen; i++ { + normalBuf[i] = 'd' + } + buf.Flush() + Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) + + // copy read origin_node + bt, _ = buf.ReadBinary(originLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'a') + } + MustTrue(t, &bt[0] != &originBuf[0]) + // next read node is data node and must be readonly and non-reusable + MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) + // copy read data_node + bt, _ = buf.ReadBinary(dataLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'b') + } + MustTrue(t, &bt[0] != &userBuf[0]) + // copy read new_node + bt, _ = buf.ReadBinary(newLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'c') + } + MustTrue(t, &bt[0] != &newBuf[0]) + // current read node is the new node and must not be reusable + newnode := buf.read + t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) + MustTrue(t, newnode.reusable()) + var nodeReleased int32 + runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + // nocopy read normal_node + bt, _ = buf.ReadBinary(normalLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'd') + } + MustTrue(t, &bt[0] == &normalBuf[0]) + // normal buffer never should be released + runtime.SetFinalizer(&bt[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + _ = buf.Release() + MustTrue(t, newnode.buf == nil) + for atomic.LoadInt32(&nodeReleased) == 0 { + runtime.GC() + t.Log("newnode release check failed") + } + Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) + runtime.KeepAlive(normalBuf) +} + +func TestBufferMode(t *testing.T) { + bufnode := newLinkBufferNode(0) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + MustTrue(t, bufnode.getMode(readonlyMask)) + MustTrue(t, !bufnode.reusable()) + + bufnode = newLinkBufferNode(1) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + MustTrue(t, !bufnode.getMode(readonlyMask)) + bufnode.setMode(nocopyReadMask, false) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + bufnode.setMode(nocopyReadMask, true) + MustTrue(t, bufnode.getMode(nocopyReadMask)) +} + func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) { b.StopTimer() @@ -653,3 +755,43 @@ func BenchmarkCopyString(b *testing.B) { } }) } + +func BenchmarkNoCopyRead(b *testing.B) { + totalSize := 0 + minSize := 32 + maxSize := minSize << 9 + for size := minSize; size <= maxSize; size = size << 1 { + totalSize += size + } + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var buffer = NewLinkBuffer(pagesize) + for pb.Next() { + buf, err := buffer.Malloc(totalSize) + if len(buf) != totalSize || err != nil { + b.Fatal(err) + } + err = buffer.MallocAck(totalSize) + if err != nil { + b.Fatal(err) + } + err = buffer.Flush() + if err != nil { + b.Fatal(err) + } + + for size := minSize; size <= maxSize; size = size << 1 { + buf, err = buffer.ReadBinary(size) + if len(buf) != size || err != nil { + b.Fatal(err) + } + } + // buffer.Release will not reuse memory since we use no copy mode here + err = buffer.Release() + if err != nil { + b.Fatal(err) + } + } + }) +}