From 1bebd1482d2a8c9cd06d4f97a11ce74196e74818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 20 Apr 2023 15:38:29 +0300 Subject: [PATCH] store: add streamed snappy encoding for postings list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We've noticed that decoding Snappy compressed postings list takes a lot of RAM: ``` (pprof) top Showing nodes accounting for 1427.30GB, 67.55% of 2112.82GB total Dropped 1069 nodes (cum <= 10.56GB) Showing top 10 nodes out of 82 flat flat% sum% cum cum% 0 0% 0% 1905.67GB 90.20% golang.org/x/sync/errgroup.(*Group).Go.func1 2.08GB 0.098% 0.098% 1456.94GB 68.96% github.com/thanos-io/thanos/pkg/store.(*blockSeriesClient).ExpandPostings 1.64GB 0.078% 0.18% 1454.87GB 68.86% github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).ExpandedPostings 2.31GB 0.11% 0.29% 1258.15GB 59.55% github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).fetchPostings 1.48GB 0.07% 0.36% 1219.67GB 57.73% github.com/thanos-io/thanos/pkg/store.diffVarintSnappyDecode 1215.21GB 57.52% 57.87% 1215.21GB 57.52% github.com/klauspost/compress/s2.Decode ``` This is because we are creating a new []byte slice for the decoded data each time. To avoid this RAM usage problem, let's stream the decoding from a given buffer. Since Snappy block format doesn't support streamed decoding, let's switch to Snappy stream format which is made for exactly that. Notice that our current `index.Postings` list does not support going back through Seek() even if theoretically one could want something like that. Fortunately, to search for posting intersection, we need to only go forward. Benchmark data: ``` name time/op PostingsEncodingDecoding/10000/raw/encode-16 71.6µs ± 3% PostingsEncodingDecoding/10000/raw/decode-16 76.3ns ± 4% PostingsEncodingDecoding/10000#01/snappy/encode-16 73.3µs ± 1% PostingsEncodingDecoding/10000#01/snappy/decode-16 1.63µs ± 6% PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16 111µs ± 2% PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16 14.5µs ± 7% PostingsEncodingDecoding/100000/snappyStreamed/encode-16 1.09ms ± 2% PostingsEncodingDecoding/100000/snappyStreamed/decode-16 14.4µs ± 4% PostingsEncodingDecoding/100000#01/raw/encode-16 710µs ± 1% PostingsEncodingDecoding/100000#01/raw/decode-16 79.3ns ±13% PostingsEncodingDecoding/100000#02/snappy/encode-16 719µs ± 1% PostingsEncodingDecoding/100000#02/snappy/decode-16 13.5µs ± 4% PostingsEncodingDecoding/1000000/raw/encode-16 7.14ms ± 1% PostingsEncodingDecoding/1000000/raw/decode-16 81.7ns ± 9% PostingsEncodingDecoding/1000000#01/snappy/encode-16 7.52ms ± 3% PostingsEncodingDecoding/1000000#01/snappy/decode-16 139µs ± 4% PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16 11.4ms ± 4% PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16 15.5µs ± 4% name alloc/op PostingsEncodingDecoding/10000/raw/encode-16 13.6kB ± 0% PostingsEncodingDecoding/10000/raw/decode-16 96.0B ± 0% PostingsEncodingDecoding/10000#01/snappy/encode-16 25.9kB ± 0% PostingsEncodingDecoding/10000#01/snappy/decode-16 11.0kB ± 0% PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16 16.6kB ± 0% PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16 148kB ± 0% PostingsEncodingDecoding/100000/snappyStreamed/encode-16 148kB ± 0% PostingsEncodingDecoding/100000/snappyStreamed/decode-16 148kB ± 0% PostingsEncodingDecoding/100000#01/raw/encode-16 131kB ± 0% PostingsEncodingDecoding/100000#01/raw/decode-16 96.0B ± 0% PostingsEncodingDecoding/100000#02/snappy/encode-16 254kB ± 0% PostingsEncodingDecoding/100000#02/snappy/decode-16 107kB ± 0% PostingsEncodingDecoding/1000000/raw/encode-16 1.25MB ± 0% PostingsEncodingDecoding/1000000/raw/decode-16 96.0B ± 0% PostingsEncodingDecoding/1000000#01/snappy/encode-16 2.48MB ± 0% PostingsEncodingDecoding/1000000#01/snappy/decode-16 1.05MB ± 0% PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16 1.47MB ± 0% PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16 148kB ± 0% name allocs/op PostingsEncodingDecoding/10000/raw/encode-16 2.00 ± 0% PostingsEncodingDecoding/10000/raw/decode-16 2.00 ± 0% PostingsEncodingDecoding/10000#01/snappy/encode-16 3.00 ± 0% PostingsEncodingDecoding/10000#01/snappy/decode-16 4.00 ± 0% PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16 4.00 ± 0% PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16 5.00 ± 0% PostingsEncodingDecoding/100000/snappyStreamed/encode-16 4.00 ± 0% PostingsEncodingDecoding/100000/snappyStreamed/decode-16 5.00 ± 0% PostingsEncodingDecoding/100000#01/raw/encode-16 2.00 ± 0% PostingsEncodingDecoding/100000#01/raw/decode-16 2.00 ± 0% PostingsEncodingDecoding/100000#02/snappy/encode-16 3.00 ± 0% PostingsEncodingDecoding/100000#02/snappy/decode-16 4.00 ± 0% PostingsEncodingDecoding/1000000/raw/encode-16 2.00 ± 0% PostingsEncodingDecoding/1000000/raw/decode-16 2.00 ± 0% PostingsEncodingDecoding/1000000#01/snappy/encode-16 3.00 ± 0% PostingsEncodingDecoding/1000000#01/snappy/decode-16 4.00 ± 0% PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16 4.00 ± 0% PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16 5.00 ± 0% ``` Compression ratios are still the same like previously: ``` $ /bin/go test -v -timeout 10m -run ^TestDiffVarintCodec$ github.com/thanos-io/thanos/pkg/store [snip] === RUN TestDiffVarintCodec/snappy/i!~"2.*" postings_codec_test.go:73: postings entries: 944450 postings_codec_test.go:74: original size (4*entries): 3777800 bytes postings_codec_test.go:80: encoded size 44498 bytes postings_codec_test.go:81: ratio: 0.012 === RUN TestDiffVarintCodec/snappyStreamed/i!~"2.*" postings_codec_test.go:73: postings entries: 944450 postings_codec_test.go:74: original size (4*entries): 3777800 bytes postings_codec_test.go:80: encoded size 44670 bytes postings_codec_test.go:81: ratio: 0.012 ``` Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 6 +- pkg/store/postings_codec.go | 158 ++++++++++++++++++++++++++++++- pkg/store/postings_codec_test.go | 62 +++++++++--- 3 files changed, 209 insertions(+), 17 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ad63cd8891..6674ec3e98 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2338,9 +2338,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab l index.Postings err error ) - if isDiffVarintSnappyEncodedPostings(b) { + if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { s := time.Now() - clPostings, err := diffVarintSnappyDecode(b) + clPostings, err := getDecodingFunction(b)(b) r.stats.cachedPostingsDecompressions += 1 r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) if err != nil { @@ -2440,7 +2440,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab compressions++ s := time.Now() bep := newBigEndianPostings(pBytes[4:]) - data, err := diffVarintSnappyEncode(bep, bep.length()) + data, err := diffVarintSnappyStreamedEncode(bep, bep.length()) compressionTime = time.Since(s) if err == nil { dataToCache = data diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index f60fe7c1b2..acfd548d57 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -5,6 +5,8 @@ package store import ( "bytes" + "encoding/binary" + "io" "sync" "github.com/golang/snappy" @@ -25,14 +27,168 @@ import ( // significantly (to about 20% of original), snappy then halves it to ~10% of the original. const ( - codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy". ) +func getDecodingFunction(input []byte) func([]byte) (closeablePostings, error) { + if isDiffVarintSnappyEncodedPostings(input) { + return diffVarintSnappyDecode + } + if isDiffVarintSnappyStreamedEncodedPostings(input) { + return diffVarintSnappyStreamedDecode + } + return nil +} + // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. func isDiffVarintSnappyEncodedPostings(input []byte) bool { return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } +// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec. +func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy)) +} + +func writeUvarint(w io.Writer, oneByteSlice []byte, x uint64) error { + for x >= 0x80 { + oneByteSlice[0] = byte(x) | 0x80 + n, err := w.Write(oneByteSlice) + if err != nil { + return err + } + if n != 1 { + return io.EOF + } + x >>= 7 + } + oneByteSlice[0] = byte(x) + n, err := w.Write(oneByteSlice) + if err != nil { + return err + } + if n != 1 { + return io.EOF + } + return nil +} + +var snappyWriterPool, snappyReaderPool sync.Pool + +func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) { + // 1.25 bytes per postings + header + snappy stream beginning. + out := make([]byte, 0, 10+snappy.MaxEncodedLen(5*length/4)+len(codecHeaderStreamedSnappy)) + out = append(out, []byte(codecHeaderStreamedSnappy)...) + compressedBuf := bytes.NewBuffer(out[len(codecHeaderStreamedSnappy):]) + var sw *snappy.Writer + + oneByteSlice := make([]byte, 1) + + pooledSW := snappyWriterPool.Get() + if pooledSW == nil { + sw = snappy.NewBufferedWriter(compressedBuf) + } else { + sw = pooledSW.(*snappy.Writer) + sw.Reset(compressedBuf) + } + + defer func() { + snappyWriterPool.Put(sw) + }() + + prev := storage.SeriesRef(0) + for p.Next() { + v := p.At() + if v < prev { + return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) + } + + if err := writeUvarint(sw, oneByteSlice, uint64(v-prev)); err != nil { + return nil, errors.Wrap(err, "writing uvarint encoded byte") + } + prev = v + } + if p.Err() != nil { + return nil, p.Err() + } + if err := sw.Close(); err != nil { + return nil, errors.Wrap(err, "closing snappy stream writer") + } + + return out[:len(codecHeaderStreamedSnappy)+compressedBuf.Len()], nil +} + +func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) { + if !isDiffVarintSnappyStreamedEncodedPostings(input) { + return nil, errors.New("header not found") + } + + return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]), nil +} + +type streamedDiffVarintPostings struct { + cur storage.SeriesRef + + sr *snappy.Reader + err error +} + +func newStreamedDiffVarintPostings(input []byte) closeablePostings { + var sr *snappy.Reader + + srPooled := snappyReaderPool.Get() + if srPooled == nil { + sr = snappy.NewReader(bytes.NewBuffer(input)) + } else { + sr = srPooled.(*snappy.Reader) + sr.Reset(bytes.NewBuffer(input)) + } + + return &streamedDiffVarintPostings{sr: sr} +} + +func (it *streamedDiffVarintPostings) close() { + snappyReaderPool.Put(it.sr) +} + +func (it *streamedDiffVarintPostings) At() storage.SeriesRef { + return it.cur +} + +func (it *streamedDiffVarintPostings) Next() bool { + val, err := binary.ReadUvarint(it.sr) + if err != nil { + if err != io.EOF { + it.err = err + } + return false + } + + it.cur = it.cur + storage.SeriesRef(val) + return true +} + +func (it *streamedDiffVarintPostings) Err() error { + return it.err +} + +func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool { + if it.cur >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + // diffVarintSnappyEncode encodes postings into diff+varint representation, // and applies snappy compression on the result. // Returned byte slice starts with codecHeaderSnappy header. diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 8ac86008b5..b04bb6f037 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -57,8 +57,9 @@ func TestDiffVarintCodec(t *testing.T) { codingFunction func(index.Postings, int) ([]byte, error) decodingFunction func([]byte) (closeablePostings, error) }{ - "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, - "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode}, } for postingName, postings := range postingsMap { @@ -194,7 +195,7 @@ func (p *uint64Postings) len() int { return len(p.vals) } -func BenchmarkEncodePostings(b *testing.B) { +func BenchmarkPostingsEncodingDecoding(b *testing.B) { const max = 1000000 r := rand.New(rand.NewSource(0)) @@ -208,16 +209,51 @@ func BenchmarkEncodePostings(b *testing.B) { p[ix] = p[ix-1] + storage.SeriesRef(d) } + codecs := map[string]struct { + codingFunction func(index.Postings, int) ([]byte, error) + decodingFunction func([]byte) (closeablePostings, error) + }{ + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode}, + } + + b.ReportAllocs() + for _, count := range []int{10000, 100000, 1000000} { - b.Run(strconv.Itoa(count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - ps := &uint64Postings{vals: p[:count]} - - _, err := diffVarintEncodeNoHeader(ps, ps.len()) - if err != nil { - b.Fatal(err) - } - } - }) + for codecName, codecFns := range codecs { + b.Run(strconv.Itoa(count), func(b *testing.B) { + b.Run(codecName, func(b *testing.B) { + b.Run("encode", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + ps := &uint64Postings{vals: p[:count]} + + _, err := codecFns.codingFunction(ps, ps.len()) + if err != nil { + b.Fatal(err) + } + } + }) + b.Run("decode", func(b *testing.B) { + ps := &uint64Postings{vals: p[:count]} + + encoded, err := codecFns.codingFunction(ps, ps.len()) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := codecFns.decodingFunction(encoded) + if err != nil { + b.Fatal(err) + } + } + }) + + }) + }) + } } }