Skip to content

Commit

Permalink
colblk: handle externally-stored values
Browse files Browse the repository at this point in the history
Add handling of values stored out-of-band in value blocks through the addition
of a new is-value-external bitmap.
  • Loading branch information
jbowens committed Aug 15, 2024
1 parent 2c7ca59 commit 53f5316
Show file tree
Hide file tree
Showing 8 changed files with 1,165 additions and 712 deletions.
35 changes: 22 additions & 13 deletions sstable/colblk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,23 +418,14 @@ func (r *BlockReader) headerToBinFormatter(f *binfmt.Formatter) {
}
}

func (r *BlockReader) columnToBinFormatter(f *binfmt.Formatter, col, rows int) {
func (r *BlockReader) formatColumn(
f *binfmt.Formatter, col int, fn func(*binfmt.Formatter, DataType),
) {
f.CommentLine("data for column %d", col)
dataType := r.DataType(col)
colSize := r.pageStart(col+1) - r.pageStart(col)
endOff := f.Offset() + int(colSize)
switch dataType {
case DataTypeBool:
bitmapToBinFormatter(f, rows)
case DataTypeUint8, DataTypeUint16, DataTypeUint32, DataTypeUint64:
uintsToBinFormatter(f, rows, dataType, nil)
case DataTypePrefixBytes:
prefixBytesToBinFormatter(f, rows, nil)
case DataTypeBytes:
rawBytesToBinFormatter(f, rows, nil)
default:
panic("unimplemented")
}
fn(f, dataType)

// We expect formatting the column data to have consumed all the bytes
// between the column's pageOffset and the next column's pageOffset.
Expand All @@ -446,3 +437,21 @@ func (r *BlockReader) columnToBinFormatter(f *binfmt.Formatter, col, rows int) {
panic(fmt.Sprintf("expected f.Offset() = %d, but found %d; did column %s format too many bytes?", endOff, f.Offset(), dataType))
}
}

func (r *BlockReader) columnToBinFormatter(f *binfmt.Formatter, col, rows int) {
r.formatColumn(f, col, func(f *binfmt.Formatter, dataType DataType) {
switch dataType {
case DataTypeBool:
bitmapToBinFormatter(f, rows)
case DataTypeUint8, DataTypeUint16, DataTypeUint32, DataTypeUint64:
uintsToBinFormatter(f, rows, dataType, nil)
case DataTypePrefixBytes:
prefixBytesToBinFormatter(f, rows, nil)
case DataTypeBytes:
rawBytesToBinFormatter(f, rows, nil)
default:
panic("unimplemented")
}
})

}
88 changes: 70 additions & 18 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/treeprinter"
"github.com/cockroachdb/pebble/sstable/block"
)

// KeySchema defines the schema of a user key, as defined by the user's
Expand Down Expand Up @@ -348,11 +350,17 @@ type DataBlockWriter struct {
// represents when the prefix stays the same, which is expected to be a
// rarer case. Before Finish-ing the column, we invert the bitmap.
prefixSame BitmapBuilder
// values is the column writer for values.
// values is the column writer for values. Iff the isValueExternal bitmap
// indicates a value is external, the value is prefixed with a ValuePrefix
// byte.
values RawBytesBuilder
// isValueExternal is the column writer for the is-value-external bitmap
// that indicates when a value is stored out-of-band in a value block.
isValueExternal BitmapBuilder

enc blockEncoder
rows int
enc blockEncoder
rows int
valuePrefixTmp [1]byte
}

// TODO(jackson): Add an isObsolete bitmap column.
Expand All @@ -361,6 +369,7 @@ const (
dataBlockColumnTrailer = iota
dataBlockColumnPrefixChanged
dataBlockColumnValue
dataBlockColumnIsValueExternal
dataBlockColumnMax
)

Expand All @@ -371,6 +380,7 @@ func (w *DataBlockWriter) Init(schema KeySchema) {
w.trailers.Init()
w.prefixSame.Reset()
w.values.Init()
w.isValueExternal.Reset()
w.rows = 0
}

Expand All @@ -380,6 +390,7 @@ func (w *DataBlockWriter) Reset() {
w.trailers.Reset()
w.prefixSame.Reset()
w.values.Reset()
w.isValueExternal.Reset()
w.rows = 0
w.enc.reset()
}
Expand All @@ -403,6 +414,10 @@ func (w *DataBlockWriter) String() string {
w.values.WriteDebug(&buf, w.rows)
fmt.Fprintln(&buf)

fmt.Fprintf(&buf, "%d: is-value-ext: ", len(w.Schema.ColumnTypes)+dataBlockColumnIsValueExternal)
w.isValueExternal.WriteDebug(&buf, w.rows)
fmt.Fprintln(&buf)

return buf.String()
}

Expand All @@ -414,13 +429,24 @@ func (w *DataBlockWriter) String() string {
//
// The caller is required to pass this in because in expected use cases, the
// caller will also require the same information.
func (w *DataBlockWriter) Add(ikey base.InternalKey, value []byte, kcmp KeyComparison) {
func (w *DataBlockWriter) Add(
ikey base.InternalKey, value []byte, valuePrefix block.ValuePrefix, kcmp KeyComparison,
) {
w.KeyWriter.WriteKey(w.rows, ikey.UserKey, kcmp.PrefixLen, kcmp.CommonPrefixLen)
if kcmp.PrefixEqual() {
w.prefixSame.Set(w.rows, true)
}
w.trailers.Set(w.rows, uint64(ikey.Trailer))
w.values.Put(value)
if valuePrefix.IsValueHandle() {
w.isValueExternal.Set(w.rows, true)
// Write the value with the value prefix byte preceding the value.
w.valuePrefixTmp[0] = byte(valuePrefix)
w.values.PutConcat(w.valuePrefixTmp[:], value)
} else {
// Elide the value prefix. Readers will examine the isValueExternal
// bitmap and know there is no value prefix byte if !isValueExternal.
w.values.Put(value)
}
w.rows++
}

Expand All @@ -436,6 +462,7 @@ func (w *DataBlockWriter) Size() int {
off = w.trailers.Size(w.rows, off)
off = w.prefixSame.Size(w.rows, off)
off = w.values.Size(w.rows, off)
off = w.isValueExternal.Size(w.rows, off)
off++ // trailer padding byte
return int(off)
}
Expand All @@ -461,8 +488,9 @@ func (w *DataBlockWriter) Finish() []byte {
w.prefixSame.Invert(w.rows)
w.enc.encode(w.rows, &w.prefixSame)

// Write the value column.
// Write the value columns.
w.enc.encode(w.rows, &w.values)
w.enc.encode(w.rows, &w.isValueExternal)
return w.enc.finish()
}

Expand All @@ -482,7 +510,15 @@ type DataBlockReader struct {
// Split) of a key changes, relative to the preceding key. This is used to
// bound seeks within a prefix, and to optimize NextPrefix.
prefixChanged Bitmap
values RawBytes
// values is the column reader for values. If the isValueExternal bitmap
// indicates a value is external, the value is prefixed with a ValuePrefix
// byte.
values RawBytes
// isValueExternal is the column reader for the is-value-external bitmap
// that indicates whether a value is stored out-of-band in a value block. If
// true, the value contains a ValuePrefix byte followed by an encoded value
// handle indicating the value's location within the value block(s).
isValueExternal Bitmap
}

// BlockReader returns a pointer to the underlying BlockReader.
Expand All @@ -494,15 +530,24 @@ func (r *DataBlockReader) BlockReader() *BlockReader {
func (r *DataBlockReader) Init(schema KeySchema, data []byte) {
r.r.Init(data, 0)
r.trailers = r.r.Uint64s(len(schema.ColumnTypes) + dataBlockColumnTrailer)
r.values = r.r.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
r.prefixChanged = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnPrefixChanged)
r.values = r.r.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
r.isValueExternal = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsValueExternal)
}

func (r *DataBlockReader) toFormatter(f *binfmt.Formatter) {
r.r.headerToBinFormatter(f)
for i := 0; i < int(r.r.header.Columns); i++ {
r.r.columnToBinFormatter(f, i, int(r.r.header.Rows))
}
}

// DataBlockIter iterates over a columnar data block.
type DataBlockIter struct {
// configuration
r *DataBlockReader
keySeeker KeySeeker
r *DataBlockReader
keySeeker KeySeeker
getLazyValue func([]byte) base.LazyValue

// state
keyIter PrefixBytesIter
Expand All @@ -515,14 +560,17 @@ var _ base.InternalIterator = (*DataBlockIter)(nil)

// Init initializes the data block iterator, configuring it to read from the
// provided reader.
func (i *DataBlockIter) Init(r *DataBlockReader, keyIterator KeySeeker) error {
func (i *DataBlockIter) Init(
r *DataBlockReader, keyIterator KeySeeker, getLazyValue func([]byte) base.LazyValue,
) error {
*i = DataBlockIter{
r: r,
keySeeker: keyIterator,
row: -1,
kvRow: math.MinInt,
kv: base.InternalKV{},
keyIter: PrefixBytesIter{},
r: r,
keySeeker: keyIterator,
getLazyValue: getLazyValue,
row: -1,
kvRow: math.MinInt,
kv: base.InternalKV{},
keyIter: PrefixBytesIter{},
}
return i.keySeeker.Init(r)
}
Expand Down Expand Up @@ -654,8 +702,12 @@ func (i *DataBlockIter) decodeRow(row int) *base.InternalKV {
UserKey: i.keySeeker.MaterializeUserKey(&i.keyIter, i.kvRow, row),
Trailer: base.InternalKeyTrailer(i.r.trailers.At(row)),
},
}
if i.r.isValueExternal.At(row) {
i.kv.V = i.getLazyValue(i.r.values.At(row))
} else {
// TODO(peter): Does manually inlining Bytes.At help?
V: base.MakeInPlaceValue(i.r.values.At(row)),
i.kv.V = base.MakeInPlaceValue(i.r.values.At(row))
}
i.row = row
i.kvRow = row
Expand Down
21 changes: 14 additions & 7 deletions sstable/colblk/data_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/itertest"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable/block"
"golang.org/x/exp/rand"
)

Expand Down Expand Up @@ -46,22 +47,27 @@ func TestDataBlock(t *testing.T) {
ik := base.ParsePrettyInternalKey(line[:j])

kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
valueString := line[j+1:]
vp := block.InPlaceValuePrefix(kcmp.PrefixEqual())
if strings.HasPrefix(valueString, "valueHandle") {
vp = block.ValueHandlePrefix(kcmp.PrefixEqual(), 0)
}
v := []byte(line[j+1:])
w.Add(ik, v, kcmp)
w.Add(ik, v, vp, kcmp)
}
fmt.Fprint(&buf, &w)
return buf.String()
case "finish":
block := w.Finish()
r.Init(testKeysSchema, block)
f := binfmt.New(r.r.data).LineWidth(20)
r.r.headerToBinFormatter(f)
for i := 0; i < int(r.r.header.Columns); i++ {
r.r.columnToBinFormatter(f, i, int(r.r.header.Rows))
}
r.toFormatter(f)

return f.String()
case "iter":
it.Init(&r, testKeysSchema.NewKeySeeker())
it.Init(&r, testKeysSchema.NewKeySeeker(), func([]byte) base.LazyValue {
return base.LazyValue{ValueOrHandle: []byte("mock external value")}
})
return itertest.RunInternalIterCmd(t, td, &it)
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
Expand Down Expand Up @@ -96,7 +102,8 @@ func benchmarkDataBlockWriter(b *testing.B, prefixSize, valueSize int) {
for w.Size() < targetBlockSize {
ik := base.MakeInternalKey(keys[j], base.SeqNum(rng.Uint64n(uint64(base.SeqNumMax))), base.InternalKeyKindSet)
kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
w.Add(ik, values[j], kcmp)
vp := block.InPlaceValuePrefix(kcmp.PrefixEqual())
w.Add(ik, values[j], vp, kcmp)
j++
}
w.Finish()
Expand Down
4 changes: 4 additions & 0 deletions sstable/colblk/raw_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package colblk

import (
"bytes"
"fmt"
"io"
"unsafe"
Expand Down Expand Up @@ -72,6 +73,9 @@ func DecodeRawBytes(b []byte, offset uint32, count int) (rawBytes RawBytes, endO
var _ DecodeFunc[RawBytes] = DecodeRawBytes

func defaultSliceFormatter(x []byte) string {
if bytes.ContainsFunc(x, func(r rune) bool { return r < 32 || r > 126 }) {
return fmt.Sprintf("%q", x)
}
return string(x)
}

Expand Down
Loading

0 comments on commit 53f5316

Please sign in to comment.