Skip to content

Commit

Permalink
colblk: separate prefixes and suffixes in index blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
jbowens committed Oct 20, 2024
1 parent 4d4abbc commit d4aa43e
Show file tree
Hide file tree
Showing 22 changed files with 851 additions and 690 deletions.
203 changes: 133 additions & 70 deletions sstable/colblk/index_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package colblk

import (
"slices"
"bytes"
"encoding/binary"
"unsafe"

"github.com/cockroachdb/crlib/crbytes"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
Expand All @@ -16,7 +18,7 @@ import (
"github.com/cockroachdb/pebble/sstable/block"
)

const indexBlockCustomHeaderSize = 0
const indexBlockCustomHeaderSize = 4

// IndexBlockWriter writes columnar index blocks. The writer is used for both
// first-level and second-level index blocks. The index block schema consists of
Expand All @@ -38,39 +40,50 @@ const indexBlockCustomHeaderSize = 0
// materialize the separator key when we need to use it outside the context of
// seeking within the block.
type IndexBlockWriter struct {
separators RawBytesBuilder
offsets UintBuilder
lengths UintBuilder
blockProperties RawBytesBuilder
rows int
enc blockEncoder
separatorPrefixes PrefixBytesBuilder
separatorSuffixes RawBytesBuilder
offsets UintBuilder
lengths UintBuilder
blockProperties RawBytesBuilder
rows int
enc blockEncoder
separatorBuf []byte
split base.Split
maxSeparatorLen int
}

const (
indexBlockColumnSeparator = iota
indexBlockColumnSeparatorPrefix = iota
indexBlockColumnSeparatorSuffix
indexBlockColumnOffsets
indexBlockColumnLengths
indexBlockColumnBlockProperties
indexBlockColumnCount
)

// Init initializes the index block writer.
func (w *IndexBlockWriter) Init() {
w.separators.Init()
func (w *IndexBlockWriter) Init(split base.Split, bundleSize int) {
w.separatorPrefixes.Init(bundleSize)
w.separatorSuffixes.Init()
w.offsets.Init()
w.lengths.Init()
w.blockProperties.Init()
w.rows = 0
w.split = split
w.maxSeparatorLen = 0
}

// Reset resets the index block writer to its initial state, retaining buffers.
func (w *IndexBlockWriter) Reset() {
w.separators.Reset()
w.separatorPrefixes.Reset()
w.separatorSuffixes.Reset()
w.offsets.Reset()
w.lengths.Reset()
w.blockProperties.Reset()
w.rows = 0
w.enc.reset()
w.separatorBuf = w.separatorBuf[:0]
w.maxSeparatorLen = 0
}

// Rows returns the number of entries in the index block so far.
Expand All @@ -86,17 +99,32 @@ func (w *IndexBlockWriter) AddBlockHandle(
separator []byte, handle block.Handle, blockProperties []byte,
) int {
idx := w.rows
w.separators.Put(separator)

// Decompose the separator into prefix and suffix.
s := w.split(separator)
commonPrefixLen := s
if idx > 0 {
commonPrefixLen = crbytes.CommonPrefix(w.separatorPrefixes.UnsafeGet(idx-1), separator[:s])
}
w.maxSeparatorLen = max(w.maxSeparatorLen, len(separator))
w.separatorPrefixes.Put(separator[:s], commonPrefixLen)
w.separatorSuffixes.Put(separator[s:])
w.offsets.Set(w.rows, handle.Offset)
w.lengths.Set(w.rows, handle.Length)
w.blockProperties.Put(blockProperties)
w.rows++
return idx
}

// UnsafeSeparator returns the separator of the i'th entry.
// UnsafeSeparator returns the separator of the i'th entry. If r is the number
// of rows, it is required that r-2 <= i < r.
func (w *IndexBlockWriter) UnsafeSeparator(i int) []byte {
return w.separators.UnsafeGet(i)
if i < w.rows-2 || i >= w.rows {
panic(errors.AssertionFailedf("UnsafeSeparator(%d); writer has %d rows", i, w.rows))
}
w.separatorBuf = append(w.separatorBuf[:0], w.separatorPrefixes.UnsafeGet(i)...)
w.separatorBuf = append(w.separatorBuf, w.separatorSuffixes.UnsafeGet(i)...)
return w.separatorBuf
}

// Size returns the size of the pending index block.
Expand All @@ -106,7 +134,8 @@ func (w *IndexBlockWriter) Size() int {

func (w *IndexBlockWriter) size(rows int) int {
off := blockHeaderSize(indexBlockColumnCount, indexBlockCustomHeaderSize)
off = w.separators.Size(rows, off)
off = w.separatorPrefixes.Size(rows, off)
off = w.separatorSuffixes.Size(rows, off)
off = w.offsets.Size(rows, off)
off = w.lengths.Size(rows, off)
off = w.blockProperties.Size(rows, off)
Expand All @@ -120,13 +149,19 @@ func (w *IndexBlockWriter) Finish(rows int) []byte {
if invariants.Enabled && rows != w.rows && rows != w.rows-1 {
panic(errors.AssertionFailedf("index block has %d rows; asked to finish %d", w.rows, rows))
}

w.enc.init(w.size(rows), Header{
Version: Version1,
Columns: indexBlockColumnCount,
Rows: uint32(rows),
}, indexBlockCustomHeaderSize)
w.enc.encode(rows, &w.separators)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:indexBlockCustomHeaderSize], uint32(w.maxSeparatorLen))
if w.rows == 0 {
return w.enc.finish()
}
w.enc.encode(rows, &w.separatorPrefixes)
w.enc.encode(rows, &w.separatorSuffixes)
w.enc.encode(rows, &w.offsets)
w.enc.encode(rows, &w.lengths)
w.enc.encode(rows, &w.blockProperties)
Expand All @@ -135,21 +170,27 @@ func (w *IndexBlockWriter) Finish(rows int) []byte {

// An IndexBlockDecoder reads columnar index blocks.
type IndexBlockDecoder struct {
separators RawBytes
offsets UnsafeUints
lengths UnsafeUints // only used for second-level index blocks
blockProps RawBytes
bd BlockDecoder
separatorPrefixes PrefixBytes
separatorSuffixes RawBytes
offsets UnsafeUints
lengths UnsafeUints // only used for second-level index blocks
blockProps RawBytes
bd BlockDecoder
maxSeparatorLen uint32
}

// Init initializes the index block decoder with the given serialized index
// block.
func (r *IndexBlockDecoder) Init(data []byte) {
r.bd.Init(data, indexBlockCustomHeaderSize)
r.separators = r.bd.RawBytes(indexBlockColumnSeparator)
r.offsets = r.bd.Uints(indexBlockColumnOffsets)
r.lengths = r.bd.Uints(indexBlockColumnLengths)
r.blockProps = r.bd.RawBytes(indexBlockColumnBlockProperties)
r.maxSeparatorLen = binary.LittleEndian.Uint32(data[:indexBlockCustomHeaderSize])
if r.bd.header.Rows > 0 {
r.separatorPrefixes = r.bd.PrefixBytes(indexBlockColumnSeparatorPrefix)
r.separatorSuffixes = r.bd.RawBytes(indexBlockColumnSeparatorSuffix)
r.offsets = r.bd.Uints(indexBlockColumnOffsets)
r.lengths = r.bd.Uints(indexBlockColumnLengths)
r.blockProps = r.bd.RawBytes(indexBlockColumnBlockProperties)
}
}

// DebugString prints a human-readable explanation of the keyspan block's binary
Expand All @@ -173,6 +214,7 @@ func (r *IndexBlockDecoder) Describe(f *binfmt.Formatter, tp treeprinter.Node) {
f.SetAnchorOffset()

n := tp.Child("index block header")
f.HexBytesln(4, "maximum sep length: %d", r.maxSeparatorLen)
r.bd.headerToBinFormatter(f, n)
for i := 0; i < indexBlockColumnCount; i++ {
r.bd.columnToBinFormatter(f, n, i, int(r.bd.header.Rows))
Expand All @@ -197,7 +239,7 @@ type IndexIter struct {
// TODO(radu): remove allocDecoder and require any Init callers to provide the
// decoder.
allocDecoder IndexBlockDecoder
keyBuf []byte
sepBuf PrefixBytesIter
}

// Assert that IndexIter satisfies the block.IndexBlockIterator interface.
Expand All @@ -215,7 +257,10 @@ func (i *IndexIter) InitWithDecoder(
i.syntheticPrefix = transforms.SyntheticPrefix
i.syntheticSuffix = transforms.SyntheticSuffix
i.noTransforms = !transforms.SyntheticPrefix.IsSet() && !transforms.SyntheticSuffix.IsSet()
// Leave h, allocDecoder, keyBuf unchanged.
sepBufBuf := i.sepBuf.Buf
i.sepBuf = PrefixBytesIter{Buf: sepBufBuf}
i.sepBuf.Init(int(i.d.maxSeparatorLen)+len(transforms.SyntheticPrefix)+len(transforms.SyntheticSuffix), transforms.SyntheticPrefix)
// Leave h, allocDecoder unchanged.
}

// Init initializes an iterator from the provided block data slice.
Expand Down Expand Up @@ -285,11 +330,11 @@ func (i *IndexIter) Handle() block.BufferHandle {
// Separator returns the separator at the iterator's current position. The
// iterator must be positioned at a valid row.
func (i *IndexIter) Separator() []byte {
key := i.d.separators.At(i.row)
if i.noTransforms {
return key
i.d.separatorPrefixes.SetAt(&i.sepBuf, i.row)
if i.syntheticSuffix.IsSet() {
return append(i.sepBuf.Buf, i.syntheticSuffix...)
}
return i.applyTransforms(key)
return append(i.sepBuf.Buf, i.d.separatorSuffixes.At(i.row)...)
}

// SeparatorLT returns true if the separator at the iterator's current
Expand All @@ -300,20 +345,9 @@ func (i *IndexIter) SeparatorLT(key []byte) bool {

// SeparatorGT returns true if the separator at the iterator's current position
// is strictly greater than (or equal, if orEqual=true) the provided key.
func (i *IndexIter) SeparatorGT(key []byte, inclusively bool) bool {
func (i *IndexIter) SeparatorGT(key []byte, orEqual bool) bool {
cmp := i.compare(i.Separator(), key)
return cmp > 0 || (cmp == 0 && inclusively)
}

func (i *IndexIter) applyTransforms(key []byte) []byte {
if i.syntheticSuffix.IsSet() {
key = key[:i.split(key)]
}
i.keyBuf = slices.Grow(i.keyBuf[:0], len(i.syntheticPrefix)+len(key)+len(i.syntheticSuffix))
i.keyBuf = append(i.keyBuf, i.syntheticPrefix...)
i.keyBuf = append(i.keyBuf, key...)
i.keyBuf = append(i.keyBuf, i.syntheticSuffix...)
return i.keyBuf
return cmp > 0 || (cmp == 0 && orEqual)
}

// BlockHandleWithProperties decodes the block handle with any encoded
Expand All @@ -332,32 +366,61 @@ func (i *IndexIter) BlockHandleWithProperties() (block.HandleWithProperties, err
// greater or equal to the given key. It returns false if the seek key is
// greater than all index block separators.
func (i *IndexIter) SeekGE(key []byte) bool {
// Define f(-1) == false and f(upper) == true.
// Invariant: f(index-1) == false, f(upper) == true.
index, upper := 0, i.n
for index < upper {
h := int(uint(index+upper) >> 1) // avoid overflow when computing h
// index ≤ h < upper

// TODO(jackson): Is Bytes.At or Bytes.Slice(Bytes.Offset(h),
// Bytes.Offset(h+1)) faster in this code?
separator := i.d.separators.At(h)
if !i.noTransforms {
// TODO(radu): compare without materializing the transformed key.
separator = i.applyTransforms(separator)
if i.n == 0 {
i.row = 0
return i.row < i.n
}
if i.syntheticPrefix.IsSet() {
var keyPrefix []byte
keyPrefix, key = splitKey(key, len(i.syntheticPrefix))
if cmp := bytes.Compare(keyPrefix, i.syntheticPrefix); cmp != 0 {
if cmp < 0 {
i.row = 0
return i.row < i.n
}
i.row = i.n
return false
}
// TODO(radu): experiment with splitting the separator prefix and suffix in
// separate columns and using bytes.Compare() on the prefix in the hot path.
c := i.compare(key, separator)
if c > 0 {
index = h + 1 // preserves f(index-1) == false
} else {
upper = h // preserves f(upper) == true
}
s := i.split(key)
// Search for the separator prefix.
var eq bool
i.row, eq = i.d.separatorPrefixes.Search(key[:s])
if !eq {
return i.row < i.n
}

// The separator prefix is equal, so we need to compare against the suffix
// as well.
keySuffix := key[s:]
if i.syntheticSuffix.IsSet() {
// NB: With synthetic suffix, it's required that each key within the
// sstable have a unique prefix. This ensures that there is at most 1
// key with the prefix key[:s], and we know that the separator at
// i.row+1 has a new prefix.
if i.compare(i.syntheticSuffix, keySuffix) < 0 {
i.row++
}
return i.row < i.n
}
// index == upper, f(index-1) == false, and f(upper) (= f(index)) == true => answer is index.
i.row = index
return index < i.n
if i.compare(i.d.separatorSuffixes.At(i.row), keySuffix) >= 0 {
// Fast path; we landed on a separator that is greater than or equal to
// the seek key.
return i.row < i.n
}

// Fall back to a slow scan forward.
//
// TODO(jackson): This can be improved by adding a PrefixBytes.NextUniqueKey
// method and then binary searching among the suffixes. The logic for
// implementing NextUniqueKey is finicky requiring lots of delicate fiddling
// with offset indexes, so it's deferred for now.
for i.row++; i.row < i.d.bd.Rows(); i.row++ {
if i.SeparatorGT(key, true /* orEqual */) {
return i.row < i.n
}
}
return false
}

// First seeks index iterator to the first block entry. It returns false if the
Expand Down Expand Up @@ -393,6 +456,6 @@ func (i *IndexIter) Prev() bool {
// Close closes the iterator, releasing any resources it holds.
func (i *IndexIter) Close() error {
i.h.Release()
*i = IndexIter{}
i.h = block.BufferHandle{}
return nil
}
4 changes: 2 additions & 2 deletions sstable/colblk/index_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestIndexBlock(t *testing.T) {
switch d.Cmd {
case "build":
var w IndexBlockWriter
w.Init()
w.Init(testkeys.Comparer.Split, 16)
for _, line := range strings.Split(d.Input, "\n") {
fields := strings.Fields(line)
var err error
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestIndexBlock(t *testing.T) {
// InitHandle.
func TestIndexIterInitHandle(t *testing.T) {
var w IndexBlockWriter
w.Init()
w.Init(testkeys.Comparer.Split, 16)
bh1 := block.Handle{Offset: 0, Length: 2000}
bh2 := block.Handle{Offset: 2008, Length: 1000}
w.AddBlockHandle([]byte("a"), bh1, nil)
Expand Down
Loading

0 comments on commit d4aa43e

Please sign in to comment.