Skip to content

Commit

Permalink
Merge pull request #54 from vinted/add_streamed_snappy
Browse files Browse the repository at this point in the history
store: add streamed snappy encoding for postings list
  • Loading branch information
GiedriusS authored Apr 24, 2023
2 parents 240ffef + 1bebd14 commit 5eaabb0
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2338,9 +2338,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := diffVarintSnappyDecode(b)
clPostings, err := getDecodingFunction(b)(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
Expand Down Expand Up @@ -2440,7 +2440,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
Expand Down
158 changes: 157 additions & 1 deletion pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package store

import (
"bytes"
"encoding/binary"
"io"
"sync"

"github.com/golang/snappy"
Expand All @@ -25,14 +27,168 @@ import (
// significantly (to about 20% of original), snappy then halves it to ~10% of the original.

const (
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy".
)

func getDecodingFunction(input []byte) func([]byte) (closeablePostings, error) {
if isDiffVarintSnappyEncodedPostings(input) {
return diffVarintSnappyDecode
}
if isDiffVarintSnappyStreamedEncodedPostings(input) {
return diffVarintSnappyStreamedDecode
}
return nil
}

// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec.
func isDiffVarintSnappyEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderSnappy))
}

// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec.
func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

func writeUvarint(w io.Writer, oneByteSlice []byte, x uint64) error {
for x >= 0x80 {
oneByteSlice[0] = byte(x) | 0x80
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
x >>= 7
}
oneByteSlice[0] = byte(x)
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
return nil
}

var snappyWriterPool, snappyReaderPool sync.Pool

func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) {
// 1.25 bytes per postings + header + snappy stream beginning.
out := make([]byte, 0, 10+snappy.MaxEncodedLen(5*length/4)+len(codecHeaderStreamedSnappy))
out = append(out, []byte(codecHeaderStreamedSnappy)...)
compressedBuf := bytes.NewBuffer(out[len(codecHeaderStreamedSnappy):])
var sw *snappy.Writer

oneByteSlice := make([]byte, 1)

pooledSW := snappyWriterPool.Get()
if pooledSW == nil {
sw = snappy.NewBufferedWriter(compressedBuf)
} else {
sw = pooledSW.(*snappy.Writer)
sw.Reset(compressedBuf)
}

defer func() {
snappyWriterPool.Put(sw)
}()

prev := storage.SeriesRef(0)
for p.Next() {
v := p.At()
if v < prev {
return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev)
}

if err := writeUvarint(sw, oneByteSlice, uint64(v-prev)); err != nil {
return nil, errors.Wrap(err, "writing uvarint encoded byte")
}
prev = v
}
if p.Err() != nil {
return nil, p.Err()
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return out[:len(codecHeaderStreamedSnappy)+compressedBuf.Len()], nil
}

func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyStreamedEncodedPostings(input) {
return nil, errors.New("header not found")
}

return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]), nil
}

type streamedDiffVarintPostings struct {
cur storage.SeriesRef

sr *snappy.Reader
err error
}

func newStreamedDiffVarintPostings(input []byte) closeablePostings {
var sr *snappy.Reader

srPooled := snappyReaderPool.Get()
if srPooled == nil {
sr = snappy.NewReader(bytes.NewBuffer(input))
} else {
sr = srPooled.(*snappy.Reader)
sr.Reset(bytes.NewBuffer(input))
}

return &streamedDiffVarintPostings{sr: sr}
}

func (it *streamedDiffVarintPostings) close() {
snappyReaderPool.Put(it.sr)
}

func (it *streamedDiffVarintPostings) At() storage.SeriesRef {
return it.cur
}

func (it *streamedDiffVarintPostings) Next() bool {
val, err := binary.ReadUvarint(it.sr)
if err != nil {
if err != io.EOF {
it.err = err
}
return false
}

it.cur = it.cur + storage.SeriesRef(val)
return true
}

func (it *streamedDiffVarintPostings) Err() error {
return it.err
}

func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool {
if it.cur >= x {
return true
}

// We cannot do any search due to how values are stored,
// so we simply advance until we find the right value.
for it.Next() {
if it.At() >= x {
return true
}
}

return false
}

// diffVarintSnappyEncode encodes postings into diff+varint representation,
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
Expand Down
62 changes: 49 additions & 13 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestDiffVarintCodec(t *testing.T) {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

for postingName, postings := range postingsMap {
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
func BenchmarkPostingsEncodingDecoding(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

Expand All @@ -208,16 +209,51 @@ func BenchmarkEncodePostings(b *testing.B) {
p[ix] = p[ix-1] + storage.SeriesRef(d)
}

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

b.ReportAllocs()

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
for codecName, codecFns := range codecs {
b.Run(strconv.Itoa(count), func(b *testing.B) {
b.Run(codecName, func(b *testing.B) {
b.Run("encode", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
b.Run("decode", func(b *testing.B) {
ps := &uint64Postings{vals: p[:count]}

encoded, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := codecFns.decodingFunction(encoded)
if err != nil {
b.Fatal(err)
}
}
})

})
})
}
}
}

0 comments on commit 5eaabb0

Please sign in to comment.