Skip to content

Commit

Permalink
colblk: add maximum-key-length data block header
Browse files Browse the repository at this point in the history
Add a uint32 header to data blocks that encodes the maximum length of any fully
materialized user key contained within the block. Iterators may use this to
allocate an appropriately-sized buffer upfront and avoid buffer resizing logic
when materializing user keys during iteration.
  • Loading branch information
jbowens committed Aug 15, 2024
1 parent 53f5316 commit bb3b71d
Show file tree
Hide file tree
Showing 8 changed files with 932 additions and 895 deletions.
47 changes: 33 additions & 14 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"cmp"
"context"
"encoding/binary"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -322,15 +323,8 @@ func (ks *defaultKeySeeker) MaterializeUserKey(keyIter *PrefixBytesIter, prevRow
ks.prefixes.SetAt(keyIter, row)
}
suffix := ks.suffixes.At(row)
n := keyIter.len + len(suffix)
if keyIter.cap < n {
keyIter.cap = n << 1
prevPtr := keyIter.ptr
keyIter.ptr = mallocgc(uintptr(keyIter.cap), nil, false)
memmove(keyIter.ptr, prevPtr, uintptr(keyIter.len))
}
memmove(unsafe.Pointer(uintptr(keyIter.ptr)+uintptr(keyIter.len)), unsafe.Pointer(unsafe.SliceData(suffix)), uintptr(len(suffix)))
return unsafe.Slice((*byte)(keyIter.ptr), n)
return unsafe.Slice((*byte)(keyIter.ptr), keyIter.len+len(suffix))
}

func (ks *defaultKeySeeker) Release() {
Expand Down Expand Up @@ -358,9 +352,10 @@ type DataBlockWriter struct {
// that indicates when a value is stored out-of-band in a value block.
isValueExternal BitmapBuilder

enc blockEncoder
rows int
valuePrefixTmp [1]byte
enc blockEncoder
rows int
maximumKeyLength int
valuePrefixTmp [1]byte
}

// TODO(jackson): Add an isObsolete bitmap column.
Expand All @@ -373,6 +368,12 @@ const (
dataBlockColumnMax
)

// The data block header is a 4-byte uint32 encoding the maximum length of a key
// contained within the block. This is used by iterators to avoid the need to
// grow key buffers while iterating over the block, ensuring that the key buffer
// is always sufficiently large.
const dataBlockCustomHeaderSize = 4

// Init initializes the data block writer.
func (w *DataBlockWriter) Init(schema KeySchema) {
w.Schema = schema
Expand All @@ -382,6 +383,7 @@ func (w *DataBlockWriter) Init(schema KeySchema) {
w.values.Init()
w.isValueExternal.Reset()
w.rows = 0
w.maximumKeyLength = 0
}

// Reset resets the data block writer to its initial state, retaining buffers.
Expand All @@ -392,6 +394,7 @@ func (w *DataBlockWriter) Reset() {
w.values.Reset()
w.isValueExternal.Reset()
w.rows = 0
w.maximumKeyLength = 0
w.enc.reset()
}

Expand Down Expand Up @@ -447,6 +450,9 @@ func (w *DataBlockWriter) Add(
// bitmap and know there is no value prefix byte if !isValueExternal.
w.values.Put(value)
}
if len(ikey.UserKey) > int(w.maximumKeyLength) {
w.maximumKeyLength = len(ikey.UserKey)
}
w.rows++
}

Expand All @@ -457,7 +463,7 @@ func (w *DataBlockWriter) Rows() int {

// Size returns the size of the current pending data block.
func (w *DataBlockWriter) Size() int {
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, 0)
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize)
off = w.KeyWriter.Size(w.rows, off)
off = w.trailers.Size(w.rows, off)
off = w.prefixSame.Size(w.rows, off)
Expand All @@ -475,7 +481,10 @@ func (w *DataBlockWriter) Finish() []byte {
Columns: uint16(cols),
Rows: uint32(w.rows),
}
w.enc.init(w.Size(), h, 0)
w.enc.init(w.Size(), h, dataBlockCustomHeaderSize)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))

// Write the user-defined key columns.
w.enc.encode(w.rows, w.KeyWriter)
Expand Down Expand Up @@ -519,6 +528,10 @@ type DataBlockReader struct {
// true, the value contains a ValuePrefix byte followed by an encoded value
// handle indicating the value's location within the value block(s).
isValueExternal Bitmap
// maximumKeyLength is the maximum length of a user key in the block.
// Iterators may use it to allocate a sufficiently large buffer up front,
// and elide size checks during iteration.
maximumKeyLength uint32
}

// BlockReader returns a pointer to the underlying BlockReader.
Expand All @@ -528,14 +541,17 @@ func (r *DataBlockReader) BlockReader() *BlockReader {

// Init initializes the data block reader with the given serialized data block.
func (r *DataBlockReader) Init(schema KeySchema, data []byte) {
r.r.Init(data, 0)
r.r.Init(data, dataBlockCustomHeaderSize)
r.trailers = r.r.Uint64s(len(schema.ColumnTypes) + dataBlockColumnTrailer)
r.prefixChanged = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnPrefixChanged)
r.values = r.r.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
r.isValueExternal = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsValueExternal)
r.maximumKeyLength = binary.LittleEndian.Uint32(data[:dataBlockCustomHeaderSize])
}

func (r *DataBlockReader) toFormatter(f *binfmt.Formatter) {
f.CommentLine("data block header")
f.HexBytesln(4, "maximum key length: %d", r.maximumKeyLength)
r.r.headerToBinFormatter(f)
for i := 0; i < int(r.r.header.Columns); i++ {
r.r.columnToBinFormatter(f, i, int(r.r.header.Rows))
Expand Down Expand Up @@ -572,6 +588,9 @@ func (i *DataBlockIter) Init(
kv: base.InternalKV{},
keyIter: PrefixBytesIter{},
}
// Allocate a keyIter buffer that's large enough to hold the largest user
// key in the block.
i.keyIter.Alloc(int(r.maximumKeyLength))
return i.keySeeker.Init(r)
}

Expand Down
20 changes: 9 additions & 11 deletions sstable/colblk/prefix_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ type PrefixBytesIter struct {
}

// SetAt updates the provided PrefixBytesIter to hold the i'th []byte slice in
// the PrefixBytes.
// the PrefixBytes. The PrefixBytesIter's buffer must be sufficiently large to
// hold the i'th []byte slice, and the caller is required to statically ensure
// this.
func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
// Determine the offset and length of the bundle prefix.
bundleOffsetIndex := b.bundleOffsetIndexForRow(i)
Expand All @@ -268,8 +270,7 @@ func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
// Grow the size of the iterator's buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
it.ptr = mallocgc(uintptr(it.cap), nil, false)
panic(errors.AssertionFailedf("buffer too small: %d > %d", it.len, it.cap))
}

// Copy the shared key prefix.
Expand All @@ -291,7 +292,9 @@ func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {

// SetNext updates the provided PrefixBytesIter to hold the next []byte slice in
// the PrefixBytes. SetNext requires the provided iter to currently hold a slice
// and for a subsequent slice to exist within the PrefixBytes.
// and for a subsequent slice to exist within the PrefixBytes. The
// PrefixBytesIter's buffer must be sufficiently large to hold the next []byte
// slice, and the caller is required to statically ensure this.
func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
it.offsetIndex++
// If the next row is in the same bundle, we can take a fast path of only
Expand All @@ -308,10 +311,7 @@ func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
// Grow the buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
prevPtr := it.ptr
it.ptr = mallocgc(uintptr(it.cap), nil, false)
memmove(it.ptr, prevPtr, uintptr(b.sharedPrefixLen)+uintptr(it.bundlePrefixLen))
panic(errors.AssertionFailedf("buffer too small: %d > %d", it.len, it.cap))
}
// Copy in the per-row suffix.
memmove(
Expand All @@ -337,9 +337,7 @@ func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
// Grow the buffer if necessary.
it.len = b.sharedPrefixLen + int(it.bundlePrefixLen) + int(rowSuffixEnd-rowSuffixStart)
if it.len > it.cap {
it.cap = it.len << 1
it.ptr = mallocgc(uintptr(it.cap), nil, false)
memmove(it.ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
panic(errors.AssertionFailedf("buffer too small: %d > %d", it.len, it.cap))
}
// Copy in the new bundle suffix.
memmove(
Expand Down
1 change: 1 addition & 0 deletions sstable/colblk/prefix_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func BenchmarkPrefixBytes(b *testing.B) {
pb, _ := DecodePrefixBytes(buf, 0, n)
b.ResetTimer()
var pbi PrefixBytesIter
pbi.Alloc(maxLen)
for i := 0; i < b.N; i++ {
j := i % n
if j == 0 {
Expand Down
Loading

0 comments on commit bb3b71d

Please sign in to comment.