Skip to content

Commit

Permalink
Merge pull request #121 from Netflix/more-effective-pooling
Browse files Browse the repository at this point in the history
More effective pooling
  • Loading branch information
ScottMansfield authored Apr 13, 2017
2 parents 1d5d897 + f3a0338 commit 992c531
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 48 deletions.
10 changes: 5 additions & 5 deletions client/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
./blast
./fill
./setget
./setops
./sizes
/blast
/fill
/setget
/setops
/sizes
10 changes: 5 additions & 5 deletions client/binprot/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var bufPool = &sync.Pool{

var resPool = &sync.Pool{
New: func() interface{} {
return res{}
return &res{}
},
}

Expand Down Expand Up @@ -125,20 +125,20 @@ type res struct {
CAS uint64
}

func readRes(r io.Reader) (res, error) {
func readRes(r io.Reader) (*res, error) {
buf := bufPool.Get().([]byte)

if _, err := io.ReadAtLeast(r, buf, 24); err != nil {
bufPool.Put(buf)
return res{}, err
return nil, err
}

if buf[0] != 0x81 {
bufPool.Put(buf)
return res{}, errors.New("Bad Magic")
return nil, errors.New("Bad Magic")
}

res := resPool.Get().(res)
res := resPool.Get().(*res)
res.Magic = buf[0]
res.Opcode = buf[1]
res.KeyLen = uint16(buf[2])<<8 | uint16(buf[3])
Expand Down
4 changes: 2 additions & 2 deletions handlers/memcached/chunked/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ var (
MetricCmdPrependMissesTokenL2 = metrics.AddCounter("cmd_prepend_misses_token_l2", nil)
)

func readResponseHeader(r *bufio.Reader) (binprot.ResponseHeader, error) {
func readResponseHeader(r *bufio.Reader) (*binprot.ResponseHeader, error) {
resHeader, err := binprot.ReadResponseHeader(r)
if err != nil {
return binprot.ResponseHeader{}, err
return nil, err
}

if err := binprot.DecodeError(resHeader); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions handlers/memcached/std/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/netflix/rend/protocol/binprot"
)

func readResponseHeader(r *bufio.Reader) (binprot.ResponseHeader, error) {
func readResponseHeader(r *bufio.Reader) (*binprot.ResponseHeader, error) {
resHeader, err := binprot.ReadResponseHeader(r)
if err != nil {
return binprot.ResponseHeader{}, err
return nil, err
}

if err := binprot.DecodeError(resHeader); err != nil {
Expand Down
1 change: 1 addition & 0 deletions orcas/locked.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func LockedWithExisting(oc OrcaConst, locksetID uint32) OrcaConst {

func (l *LockedOrca) getlock(key []byte, read bool) sync.Locker {
h := l.hpool.Get().(hash.Hash32)
defer l.hpool.Put(h)
h.Reset()

// Calculate bucket using hash and mod. hash.Hash.Write() never returns an error.
Expand Down
39 changes: 19 additions & 20 deletions protocol/binprot/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package binprot

import (
"encoding/binary"
"fmt"
"io"
"sync"

Expand Down Expand Up @@ -58,8 +59,8 @@ type ResponseHeader struct {
CASToken uint64
}

func makeRequestHeader(opcode uint8, keyLength, extraLength, totalBodyLength int, opaque uint32) RequestHeader {
rh := reqHeadPool.Get().(RequestHeader)
func makeRequestHeader(opcode uint8, keyLength, extraLength, totalBodyLength int, opaque uint32) *RequestHeader {
rh := reqHeadPool.Get().(*RequestHeader)
rh.Magic = MagicRequest
rh.Opcode = opcode
rh.KeyLength = uint16(keyLength)
Expand All @@ -81,42 +82,40 @@ var bufPool = &sync.Pool{

var resHeadPool = &sync.Pool{
New: func() interface{} {
return ResponseHeader{}
return new(ResponseHeader)
},
}

func PutResponseHeader(rh ResponseHeader) {
// PutResponseHeader returns the response header to the pool
func PutResponseHeader(rh *ResponseHeader) {
resHeadPool.Put(rh)
}

var reqHeadPool = &sync.Pool{
New: func() interface{} {
return RequestHeader{}
return new(RequestHeader)
},
}

var (
emptyResHeader = ResponseHeader{}
emptyReqHeader = RequestHeader{}
)

func readRequestHeader(r io.Reader) (RequestHeader, error) {
func readRequestHeader(r io.Reader) (*RequestHeader, error) {
buf := bufPool.Get().([]byte)

br, err := io.ReadAtLeast(r, buf, ReqHeaderLen)
metrics.IncCounterBy(common.MetricBytesReadRemote, uint64(br))
if err != nil {
bufPool.Put(buf)
return emptyReqHeader, err
return nil, err
}

if buf[0] != MagicRequest {
fmt.Printf("%#v\n", buf)
bufPool.Put(buf)
metrics.IncCounter(MetricBinaryRequestHeadersBadMagic)
return emptyReqHeader, ErrBadMagic
return nil, ErrBadMagic
}

rh := reqHeadPool.Get().(RequestHeader)
rh := reqHeadPool.Get().(*RequestHeader)

rh.Magic = buf[0]
rh.Opcode = buf[1]
rh.KeyLength = binary.BigEndian.Uint16(buf[2:4])
Expand All @@ -139,7 +138,7 @@ func readRequestHeader(r io.Reader) (RequestHeader, error) {
return rh, nil
}

func writeRequestHeader(w io.Writer, rh RequestHeader) error {
func writeRequestHeader(w io.Writer, rh *RequestHeader) error {
buf := bufPool.Get().([]byte)

buf[0] = rh.Magic
Expand All @@ -164,23 +163,23 @@ func writeRequestHeader(w io.Writer, rh RequestHeader) error {
return err
}

func ReadResponseHeader(r io.Reader) (ResponseHeader, error) {
func ReadResponseHeader(r io.Reader) (*ResponseHeader, error) {
buf := bufPool.Get().([]byte)

br, err := io.ReadAtLeast(r, buf, resHeaderLen)
metrics.IncCounterBy(common.MetricBytesReadRemote, uint64(br))
if err != nil {
bufPool.Put(buf)
return emptyResHeader, err
return nil, err
}

if buf[0] != MagicResponse {
bufPool.Put(buf)
metrics.IncCounter(MetricBinaryResponseHeadersBadMagic)
return emptyResHeader, ErrBadMagic
return nil, ErrBadMagic
}

rh := resHeadPool.Get().(ResponseHeader)
rh := resHeadPool.Get().(*ResponseHeader)
rh.Magic = buf[0]
rh.Opcode = buf[1]
rh.KeyLength = binary.BigEndian.Uint16(buf[2:4])
Expand All @@ -201,7 +200,7 @@ func ReadResponseHeader(r io.Reader) (ResponseHeader, error) {
return rh, nil
}

func writeResponseHeader(w io.Writer, rh ResponseHeader) error {
func writeResponseHeader(w io.Writer, rh *ResponseHeader) error {
buf := bufPool.Get().([]byte)

buf[0] = rh.Magic
Expand Down
27 changes: 20 additions & 7 deletions protocol/binprot/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ func (b BinaryParser) Parse() (common.Request, common.RequestType, uint64, error
// read in the full header before any variable length fields
reqHeader, err := readRequestHeader(b.reader)
start := timer.Now()
defer reqHeadPool.Put(reqHeader)

if err != nil {
return nil, common.RequestUnknown, start, err
}

defer reqHeadPool.Put(reqHeader)

switch reqHeader.Opcode {
case OpcodeSet:
return setRequest(b.reader, reqHeader, common.RequestSet, false, start)
Expand Down Expand Up @@ -291,13 +292,15 @@ func (b BinaryParser) Parse() (common.Request, common.RequestType, uint64, error
return nil, common.RequestUnknown, start, common.ErrUnknownCmd
}

func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error) {
func readBatchGet(r io.Reader, header *RequestHeader) (common.GetRequest, error) {
var keys [][]byte
var opaques []uint32
var quiet []bool
var noopOpaque uint32
var noopEnd bool

first := true

// while GETQ
// read key, read header
for header.Opcode == OpcodeGetQ {
Expand All @@ -312,7 +315,11 @@ func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error)
quiet = append(quiet, true)

// read in the next header
reqHeadPool.Put(header)
if !first {
reqHeadPool.Put(header)
} else {
first = false
}
header, err = readRequestHeader(r)
if err != nil {
return common.GetRequest{}, err
Expand Down Expand Up @@ -354,13 +361,15 @@ func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error)
}, nil
}

func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error) {
func readBatchGetE(r io.Reader, header *RequestHeader) (common.GetRequest, error) {
var keys [][]byte
var opaques []uint32
var quiet []bool
var noopOpaque uint32
var noopEnd bool

first := true

// while GETQ
// read key, read header
for header.Opcode == OpcodeGetEQ {
Expand All @@ -375,7 +384,11 @@ func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error)
quiet = append(quiet, true)

// read in the next header
reqHeadPool.Put(header)
if !first {
reqHeadPool.Put(header)
} else {
first = false
}
header, err = readRequestHeader(r)
if err != nil {
return common.GetRequest{}, err
Expand Down Expand Up @@ -417,7 +430,7 @@ func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error)
}, nil
}

func setRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
func setRequest(r io.Reader, reqHeader *RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
// flags, exptime, key, value
flags, err := readUInt32(r)
if err != nil {
Expand Down Expand Up @@ -459,7 +472,7 @@ func setRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType
}, reqType, start, nil
}

func appendPrependRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
func appendPrependRequest(r io.Reader, reqHeader *RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
// key, value
key, err := readString(r, reqHeader.KeyLength)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions protocol/binprot/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (d dummyIO) Write(p []byte) (int, error) {
}

var (
reqHeaderBenchmarkSink RequestHeader
resHeaderBenchmarkSink ResponseHeader
reqHeaderBenchmarkSink *RequestHeader
resHeaderBenchmarkSink *ResponseHeader
errBenchmarkSink error
)

Expand All @@ -71,7 +71,7 @@ func BenchmarkHeaders(b *testing.B) {
}
})
b.Run("writeRequestHeader", func(b *testing.B) {
temp := RequestHeader{}
temp := &RequestHeader{}
for i := 0; i < b.N; i++ {
errBenchmarkSink = writeRequestHeader(dummyIO{}, temp)
}
Expand All @@ -82,7 +82,7 @@ func BenchmarkHeaders(b *testing.B) {
}
})
b.Run("writeResponseHeader", func(b *testing.B) {
temp := ResponseHeader{}
temp := &ResponseHeader{}
for i := 0; i < b.N; i++ {
errBenchmarkSink = writeResponseHeader(dummyIO{}, temp)
}
Expand Down
4 changes: 2 additions & 2 deletions protocol/binprot/respond.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func getCommon(w *bufio.Writer, response common.GetResponse, opcode uint8) error
func writeSuccessResponseHeader(w *bufio.Writer, opcode uint8, keyLength, extraLength,
totalBodyLength int, opaque uint32, flush bool) error {

header := resHeadPool.Get().(ResponseHeader)
header := resHeadPool.Get().(*ResponseHeader)

header.Magic = MagicResponse
header.Opcode = opcode
Expand Down Expand Up @@ -339,7 +339,7 @@ func writeSuccessResponseHeader(w *bufio.Writer, opcode uint8, keyLength, extraL
}

func writeErrorResponseHeader(w *bufio.Writer, opcode uint8, status uint16, opaque uint32) error {
header := resHeadPool.Get().(ResponseHeader)
header := resHeadPool.Get().(*ResponseHeader)

header.Magic = MagicResponse
header.Opcode = opcode
Expand Down
2 changes: 1 addition & 1 deletion protocol/binprot/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
StatusInvalid = uint16(0xFFFF)
)

func DecodeError(header ResponseHeader) error {
func DecodeError(header *ResponseHeader) error {
switch header.Status {
case StatusKeyEnoent:
return common.ErrKeyNotFound
Expand Down

0 comments on commit 992c531

Please sign in to comment.