Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
add bitmapPostings
Browse files Browse the repository at this point in the history
Signed-off-by: naivewong <[email protected]>
  • Loading branch information
naivewong committed Jun 17, 2019
1 parent bf6c0ae commit ef22dcd
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 2 deletions.
5 changes: 5 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
writeDeltaBlockPostings(&w.buf2, refs)
case 4:
writeBaseDeltaBlockPostings(&w.buf2, refs)
case 5:
writeBitmapPostings(&w.buf2, refs)
}

w.uint32s = refs
Expand Down Expand Up @@ -1061,6 +1063,9 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
case 4:
l := d.Get()
return n, newBaseDeltaBlockPostings(l, n), d.Err()
case 5:
l := d.Get()
return n, newBitmapPostings(l), d.Err()
default:
return n, EmptyPostings(), d.Err()
}
Expand Down
7 changes: 7 additions & 0 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/tsdb/encoding"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/fileutil"
)

type series struct {
Expand Down Expand Up @@ -338,6 +339,12 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
testutil.Ok(t, err)

f, err := fileutil.OpenMmapFile(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
toc, err := NewTOCFromByteSlice(realByteSlice(f.Bytes()))
testutil.Ok(t, err)
t.Log("size of postings =", toc.LabelIndicesTable - toc.Postings)

ir, err := NewFileReader(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)

Expand Down
125 changes: 123 additions & 2 deletions index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ func (it *bigEndianPostings) Err() error {
return nil
}

// 1 is bigEndian, 2 is baseDelta, 3 is deltaBlock, 4 is baseDeltaBlock.
const postingsType = 4
// 1 is bigEndian, 2 is baseDelta, 3 is deltaBlock, 4 is baseDeltaBlock, 5 is bitmapPostings.
const postingsType = 5

type bitSlice struct {
bstream []byte
Expand Down Expand Up @@ -1124,3 +1124,124 @@ func writeBaseDeltaBlockPostings(e *encoding.Encbuf, arr []uint32) {
e.B = e.B[:len(e.B)-(len(e.B)-startLen)%deltaBlockSize]
}
}

// 8bits -> 256/8=32bytes, 12bits -> 4096/8=512bytes, 16bits -> 65536/8=8192bytes.
const bitmapBits = 8

// Bitmap block format.
// ┌──────────┬────────┐
// │ key <4b> │ bitmap │
// └──────────┴────────┘
type bitmapPostings struct {
bs []byte
cur uint64
inside bool
idx1 int
idx2 int
bitmapSize int
key uint32
}

func newBitmapPostings(bstream []byte) *bitmapPostings {
return &bitmapPostings{bs: bstream, bitmapSize: 1<<(bitmapBits-3)}
}

func (it *bitmapPostings) At() uint64 {
return it.cur
}

func (it *bitmapPostings) Next() bool {
if it.inside {
for it.idx1 < it.bitmapSize {
if it.bs[it.idx1+4] == byte(0) {
it.idx1 += 1
continue
}
for it.idx1 < it.bitmapSize {
if it.bs[it.idx1+4] & (1 << uint(7 - it.idx2)) != byte(0) {
it.cur = uint64(it.key << bitmapBits) + uint64(it.idx1 * 8 + it.idx2)
it.idx2 += 1
if it.idx2 == 8 {
it.idx1 += 1
it.idx2 = 0
}
return true
} else {
it.idx2 += 1
if it.idx2 == 8 {
it.idx1 += 1
it.idx2 = 0
}
}
}
}
it.bs = it.bs[it.bitmapSize+4:]
it.inside = false
it.idx1 = 0
return it.Next()
} else {
if len(it.bs) - 4 >= it.bitmapSize {
it.key = binary.BigEndian.Uint32(it.bs)
it.inside = true
return it.Next()
} else {
return false
}
}
}

func (it *bitmapPostings) Seek(x uint64) bool {
if it.cur >= x {
return true
}
curKey := uint32(x) >> bitmapBits
// curVal := uint32(x) & uint32((1 << uint(bitmapBits)) - 1)
i := sort.Search(len(it.bs)/(it.bitmapSize+4), func(i int) bool {
return binary.BigEndian.Uint32(it.bs[i*(it.bitmapSize+4):]) > curKey
})
if i > 0 {
i -= 1
if i > 0 {
it.idx1 = 0
it.idx2 = 0
it.bs = it.bs[i*(it.bitmapSize+4):]
it.inside = false
}
}
for it.Next() {
if it.At() >= x {
return true
}
}
return false
}

func (it *bitmapPostings) Err() error {
return nil
}

func writeBitmapPostings(e *encoding.Encbuf, arr []uint32) {
key := uint32(0xffffffff)
bitmapSize := 1 << (bitmapBits - 3)
mask := uint32((1 << uint(bitmapBits)) - 1)
var curKey uint32
var curVal uint32
var offset int // The starting offset of the bitmap of each block.
var idx1 int
var idx2 int
for _, val := range arr {
curKey = val >> bitmapBits
curVal = val & mask
idx1 = int(curVal) >> 3
idx2 = int(curVal) % 8
if curKey != key {
key = curKey
e.PutBE32(uint32(key))
offset = len(e.Get())
for i := 0; i < bitmapSize; i++ {
e.PutByte(byte(0))
}
}
e.B[offset+idx1] |= 1 << uint(7 - idx2)
}
}
106 changes: 106 additions & 0 deletions index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,80 @@ func TestBaseDeltaBlockPostings(t *testing.T) {
})
}

func TestBitmapPostings(t *testing.T) {
num := 1000
// mock a list as postings
ls := make([]uint32, num)
ls[0] = 2
for i := 1; i < num; i++ {
ls[i] = ls[i-1] + uint32(rand.Int31n(25)) + 2
// ls[i] = ls[i-1] + 2
}

buf := encoding.Encbuf{}
writeBitmapPostings(&buf, ls)
// t.Log("len", len(buf.Get()))

t.Run("Iteration", func(t *testing.T) {
bp := newBitmapPostings(buf.Get())
for i := 0; i < num; i++ {
testutil.Assert(t, bp.Next() == true, "")
// t.Log("ls[i] =", ls[i], "bp.At() =", bp.At())
testutil.Equals(t, uint64(ls[i]), bp.At())
}

testutil.Assert(t, bp.Next() == false, "")
testutil.Assert(t, bp.Err() == nil, "")
})

t.Run("Seek", func(t *testing.T) {
table := []struct {
seek uint32
val uint32
found bool
}{
{
ls[0] - 1, ls[0], true,
},
{
ls[4], ls[4], true,
},
{
ls[500] - 1, ls[500], true,
},
{
ls[600] + 1, ls[601], true,
},
{
ls[600] + 1, ls[601], true,
},
{
ls[600] + 1, ls[601], true,
},
{
ls[0], ls[601], true,
},
{
ls[600], ls[601], true,
},
{
ls[999], ls[999], true,
},
{
ls[999] + 10, ls[999], false,
},
}

bp := newBitmapPostings(buf.Get())

for _, v := range table {
testutil.Equals(t, v.found, bp.Seek(uint64(v.seek)))
testutil.Equals(t, uint64(v.val), bp.At())
testutil.Assert(t, bp.Err() == nil, "")
}
})
}

func BenchmarkPostings(b *testing.B) {
num := 100000
// mock a list as postings
Expand Down Expand Up @@ -977,6 +1051,11 @@ func BenchmarkPostings(b *testing.B) {
writeBaseDeltaBlockPostings(&bufBDB, ls)
// b.Log(len(bufBDB.Get()))

// bitmapPostings.
bufBM := encoding.Encbuf{}
writeBitmapPostings(&bufBM, ls)
// b.Log("bitmapPostings size", bitmapBits, "bits =", len(bufBM.Get()))

table := []struct {
seek uint32
val uint32
Expand Down Expand Up @@ -1070,6 +1149,20 @@ func BenchmarkPostings(b *testing.B) {
testutil.Assert(bench, bdbp.Err() == nil, "")
}
})
b.Run("bitmapPostingsIteration", func(bench *testing.B) {
bench.ResetTimer()
bench.ReportAllocs()
for j := 0; j < bench.N; j++ {
bm := newBitmapPostings(bufBM.Get())

for i := 0; i < num; i++ {
testutil.Assert(bench, bm.Next() == true, "")
testutil.Equals(bench, uint64(ls[i]), bm.At())
}
testutil.Assert(bench, bm.Next() == false, "")
testutil.Assert(bench, bm.Err() == nil, "")
}
})

b.Run("bigEndianSeek", func(bench *testing.B) {
bench.ResetTimer()
Expand Down Expand Up @@ -1123,6 +1216,19 @@ func BenchmarkPostings(b *testing.B) {
}
}
})
b.Run("bitmapPostingsSeek", func(bench *testing.B) {
bench.ResetTimer()
bench.ReportAllocs()
for j := 0; j < bench.N; j++ {
bm := newBitmapPostings(bufBM.Get())

for _, v := range table {
testutil.Equals(bench, v.found, bm.Seek(uint64(v.seek)))
testutil.Equals(bench, uint64(v.val), bm.At())
testutil.Assert(bench, bm.Err() == nil, "")
}
}
})
}

func TestIntersectWithMerge(t *testing.T) {
Expand Down

0 comments on commit ef22dcd

Please sign in to comment.