diff --git a/nocopy.go b/nocopy.go index 80df5f9b..eaccdb1f 100644 --- a/nocopy.go +++ b/nocopy.go @@ -244,10 +244,11 @@ func NewIOReadWriter(rw ReadWriter) io.ReadWriter { } const ( - block1k = 1 * 1024 - block2k = 2 * 1024 - block4k = 4 * 1024 - block8k = 8 * 1024 + block1k = 1 * 1024 + block2k = 2 * 1024 + block4k = 4 * 1024 + block8k = 8 * 1024 + block32k = 32 * 1024 ) const pagesize = block8k diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 555ba5ce..9ff72dbf 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -224,11 +224,23 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) { b.recalLen(-n) // re-cal length // single node - p = make([]byte, n) if b.isSingleNode(n) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(reuseMask) { + return b.read.Next(n) + } + if n >= 128 && cap(b.read.buf) < block32k { + b.read.setMode(reuseMask, false) + return b.read.Next(n) + } + // if the underlying buffer too large, we shouldn't use no-copy mode + p = make([]byte, n) copy(p, b.read.Next(n)) return p } + p = make([]byte, n) // multiple nodes var pIdx int var l int @@ -490,9 +502,9 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { newNode.off = malloc newNode.buf = origin.buf[:malloc] newNode.malloc = origin.malloc - newNode.readonly = false + newNode.setMode(readonlyMask, false) origin.malloc = malloc - origin.readonly = true + origin.setMode(readonlyMask, true) // link nodes dataNode.next = newNode @@ -675,9 +687,9 @@ func (b *LinkBuffer) recalLen(delta int) (length int) { func newLinkBufferNode(size int) *linkBufferNode { var node = linkedPool.Get().(*linkBufferNode) // reset node offset - node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false + node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode if size <= 0 { - node.readonly = true + node.setMode(readonlyMask, true) return node } if size < LinkBufferCap { @@ -695,14 +707,22 @@ var linkedPool = sync.Pool{ }, } +const ( + defaultLinkBufferMode = 1 << 0 + // reuse mode, indicate weather reuse buffer node data, default true + reuseMask uint8 = 1 << 0 // 0000 0001 + // read-only mode, introduced by Refer, WriteString, WriteBinary, etc., default false + readonlyMask uint8 = 1 << 1 // 0000 0010 +) + type linkBufferNode struct { - buf []byte // buffer - off int // read-offset - malloc int // write-offset - refer int32 // reference count - readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false - origin *linkBufferNode // the root node of the extends - next *linkBufferNode // the next node of the linked buffer + buf []byte // buffer + off int // read-offset + malloc int // write-offset + refer int32 // reference count + mode uint8 // mode store all bool bit status + origin *linkBufferNode // the root node of the extends + next *linkBufferNode // the next node of the linked buffer } func (node *linkBufferNode) Len() (l int) { @@ -763,7 +783,7 @@ func (node *linkBufferNode) Release() (err error) { // release self if atomic.AddInt32(&node.refer, -1) == 0 { // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. - if !node.readonly { + if node.reusable() { free(node.buf) } node.buf, node.origin, node.next = nil, nil, nil @@ -779,8 +799,8 @@ func (b *LinkBuffer) growth(n int) { if n <= 0 { return } - // Must skip read-only node. - for b.write.readonly || cap(b.write.buf)-b.write.malloc < n { + // the memory of readonly node if not malloc by us so should skip them + for b.write.getMode(readonlyMask) || cap(b.write.buf)-b.write.malloc < n { if b.write.next == nil { b.write.next = newLinkBufferNode(n) b.write = b.write.next @@ -804,6 +824,22 @@ func (b *LinkBuffer) isSingleNode(readN int) (single bool) { return l >= readN } +func (node *linkBufferNode) getMode(mask uint8) bool { + return node.mode&mask > 0 +} + +func (node *linkBufferNode) setMode(mask uint8, enable bool) { + if enable { + node.mode = node.mode | mask + } else { + node.mode = node.mode & ^mask + } +} + +func (node *linkBufferNode) reusable() bool { + return !(node.mode&reuseMask > 0 || node.mode&readonlyMask > 0) +} + // zero-copy slice convert to string func unsafeSliceToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index c3f9b9d8..2225f623 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -466,6 +466,20 @@ func TestWriteDirect(t *testing.T) { } } +func TestBufferMode(t *testing.T) { + bufnode := newLinkBufferNode(0) + MustTrue(t, bufnode.getMode(reuseMask)) + MustTrue(t, bufnode.getMode(readonlyMask)) + + bufnode = newLinkBufferNode(1) + MustTrue(t, bufnode.getMode(reuseMask)) + MustTrue(t, !bufnode.getMode(readonlyMask)) + bufnode.setMode(reuseMask, false) + MustTrue(t, !bufnode.getMode(reuseMask)) + bufnode.setMode(reuseMask, true) + MustTrue(t, bufnode.getMode(reuseMask)) +} + func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) { b.StopTimer() @@ -628,3 +642,27 @@ func BenchmarkCopyString(b *testing.B) { } }) } + +func BenchmarkNoCopyRead(b *testing.B) { + totalSize := 0 + minSize := 32 + maxSize := minSize << 10 + for size := minSize; size <= maxSize; size = size << 1 { + totalSize += size + } + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var buffer = NewLinkBuffer(pagesize) + for pb.Next() { + _, _ = buffer.Malloc(totalSize) + _ = buffer.MallocAck(totalSize) + _ = buffer.Flush() + + for size := minSize; size <= maxSize; size = size << 1 { + _, _ = buffer.ReadBinary(size) + } + _ = buffer.Release() + } + }) +}