diff --git a/core/framing/buffered.go b/core/framing/buffered.go index 62835d1..5a0dd7c 100644 --- a/core/framing/buffered.go +++ b/core/framing/buffered.go @@ -4,38 +4,41 @@ import ( "encoding/binary" "io" "sync/atomic" + "unsafe" "github.com/rsocket/rsocket-go/core" "github.com/rsocket/rsocket-go/internal/common" "github.com/rsocket/rsocket-go/internal/u24" + uberatomic "go.uber.org/atomic" ) // bufferedFrame is basic frame implementation. type bufferedFrame struct { - inner *common.ByteBuff - refs int32 + innerPtr unsafe.Pointer + refs uberatomic.Int32 } func newBufferedFrame(inner *common.ByteBuff) *bufferedFrame { - return &bufferedFrame{ - inner: inner, - refs: 1, - } + frame := &bufferedFrame{} + atomic.StorePointer(&frame.innerPtr, unsafe.Pointer(inner)) + frame.refs.Store(1) + return frame } func (f *bufferedFrame) IncRef() int32 { - return atomic.AddInt32(&f.refs, 1) + return f.refs.Add(1) } func (f *bufferedFrame) RefCnt() int32 { - return atomic.LoadInt32(&f.refs) + return f.refs.Load() } func (f *bufferedFrame) Header() core.FrameHeader { - if f.inner == nil { + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { panic("frame has been released!") } - b := f.inner.Bytes() + b := inner.Bytes() _ = b[core.FrameHeaderLen-1] var h core.FrameHeader copy(h[:], b) @@ -43,52 +46,70 @@ func (f *bufferedFrame) Header() core.FrameHeader { } func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool { - if f.inner == nil { + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { panic("frame has been released!") } - n := binary.BigEndian.Uint16(f.inner.Bytes()[4:6]) + n := binary.BigEndian.Uint16(inner.Bytes()[4:6]) return core.FrameFlag(n&0x03FF)&flag == flag } func (f *bufferedFrame) StreamID() uint32 { - if f.inner == nil { + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { panic("frame has been released!") } - return binary.BigEndian.Uint32(f.inner.Bytes()[:4]) + return binary.BigEndian.Uint32(inner.Bytes()[:4]) } // Release releases resource. func (f *bufferedFrame) Release() { - if f != nil && f.inner != nil && atomic.AddInt32(&f.refs, -1) == 0 { - common.ReturnByteBuff(f.inner) - f.inner = nil + if f == nil { + return + } + refs := f.refs.Add(-1) + if refs > 0 { + return + } + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner != nil { + swapped := atomic.CompareAndSwapPointer(&f.innerPtr, unsafe.Pointer(inner), unsafe.Pointer(nil)) + if swapped { + common.ReturnByteBuff(inner) + } } } // Body returns frame body. func (f *bufferedFrame) Body() []byte { - if f.inner == nil { + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { return nil } - b := f.inner.Bytes() + b := inner.Bytes() _ = b[core.FrameHeaderLen-1] return b[core.FrameHeaderLen:] } // Len returns length of frame. func (f *bufferedFrame) Len() int { - if f.inner == nil { + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { return 0 } - return f.inner.Len() + return inner.Len() } // WriteTo write frame to writer. func (f *bufferedFrame) WriteTo(w io.Writer) (n int64, err error) { - if f == nil || f.inner == nil { + if f == nil { + return + } + inner := (*common.ByteBuff)(atomic.LoadPointer(&f.innerPtr)) + if inner == nil { return } - n, err = f.inner.WriteTo(w) + n, err = inner.WriteTo(w) return }