Skip to content

Commit

Permalink
colblk: add a KeySchema-specific header
Browse files Browse the repository at this point in the history
We add a fixed-size KeySchema-specific header to the beginning of the
custom block header. A KeySchema implementation can store there
information and retrieve it when initializing the key seeker.

We use this to add a byte to cockroach blocks which indicates what
types of suffixes are contained in the block (mvcc, empty, non-mvcc).
This will be used for optimized fast paths for the common cases.
  • Loading branch information
RaduBerinde committed Oct 21, 2024
1 parent ad898ed commit 410a16c
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 37 deletions.
83 changes: 69 additions & 14 deletions internal/crdbtest/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"
"math/rand/v2"
"slices"
"strings"
"time"
"unsafe"

Expand Down Expand Up @@ -517,20 +518,16 @@ const (
)

var KeySchema = colblk.KeySchema{
Name: "crdb1",
Name: "crdb1",
HeaderSize: 1,
ColumnTypes: []colblk.DataType{
cockroachColRoachKey: colblk.DataTypePrefixBytes,
cockroachColMVCCWallTime: colblk.DataTypeUint,
cockroachColMVCCLogical: colblk.DataTypeUint,
cockroachColUntypedVersion: colblk.DataTypeBytes,
},
NewKeyWriter: func() colblk.KeyWriter {
kw := &cockroachKeyWriter{}
kw.roachKeys.Init(16)
kw.wallTimes.Init()
kw.logicalTimes.InitWithDefault()
kw.untypedVersions.Init()
return kw
return makeCockroachKeyWriter()
},
InitKeySeekerMetadata: func(meta *colblk.KeySeekerMetadata, d *colblk.DataBlockDecoder) {
ks := (*cockroachKeySeeker)(unsafe.Pointer(meta))
Expand All @@ -541,14 +538,64 @@ var KeySchema = colblk.KeySchema{
},
}

// suffixTypes is a bitfield indicating what kind of suffixes are present in a
// block.
type suffixTypes uint8

const (
// hasMVCCSuffixes is set if there is at least one key with an MVCC suffix in
// the block.
hasMVCCSuffixes suffixTypes = (1 << iota)
// hasEmptySuffixes is set if there is at least one key with no suffix in the block.
hasEmptySuffixes
// hasNonMVCCSuffixes is set if there is at least one key with a non-empty,
// non-MVCC sufffix.
hasNonMVCCSuffixes
)

func (s suffixTypes) String() string {
var suffixes []string
if s&hasMVCCSuffixes != 0 {
suffixes = append(suffixes, "mvcc")
}
if s&hasEmptySuffixes != 0 {
suffixes = append(suffixes, "empty")
}
if s&hasEmptySuffixes != 0 {
suffixes = append(suffixes, "non-mvcc")
}
if len(suffixes) == 0 {
return "none"
}
return strings.Join(suffixes, ",")
}

type cockroachKeyWriter struct {
roachKeys colblk.PrefixBytesBuilder
wallTimes colblk.UintBuilder
logicalTimes colblk.UintBuilder
untypedVersions colblk.RawBytesBuilder
suffixTypes suffixTypes
prevSuffix []byte
}

func makeCockroachKeyWriter() *cockroachKeyWriter {
kw := &cockroachKeyWriter{}
kw.roachKeys.Init(16)
kw.wallTimes.Init()
kw.logicalTimes.InitWithDefault()
kw.untypedVersions.Init()
return kw
}

func (kw *cockroachKeyWriter) Reset() {
kw.roachKeys.Reset()
kw.wallTimes.Reset()
kw.logicalTimes.Reset()
kw.untypedVersions.Reset()
kw.suffixTypes = 0
}

func (kw *cockroachKeyWriter) ComparePrev(key []byte) colblk.KeyComparison {
var cmpv colblk.KeyComparison
cmpv.PrefixLen = int32(Split(key)) // TODO(jackson): Inline
Expand Down Expand Up @@ -596,6 +643,13 @@ func (kw *cockroachKeyWriter) WriteKey(
kw.logicalTimes.Set(row, uint64(logicalTime))
}
kw.untypedVersions.Put(untypedVersion)
if wallTime != 0 || logicalTime != 0 {
kw.suffixTypes |= hasMVCCSuffixes
} else if len(untypedVersion) == 0 {
kw.suffixTypes |= hasEmptySuffixes
} else {
kw.suffixTypes |= hasNonMVCCSuffixes
}
}

func (kw *cockroachKeyWriter) MaterializeKey(dst []byte, i int) []byte {
Expand All @@ -610,13 +664,6 @@ func (kw *cockroachKeyWriter) MaterializeKey(dst []byte, i int) []byte {
return AppendTimestamp(dst, kw.wallTimes.Get(i), uint32(kw.logicalTimes.Get(i)))
}

func (kw *cockroachKeyWriter) Reset() {
kw.roachKeys.Reset()
kw.wallTimes.Reset()
kw.logicalTimes.Reset()
kw.untypedVersions.Reset()
}

func (kw *cockroachKeyWriter) WriteDebug(dst io.Writer, rows int) {
fmt.Fprint(dst, "prefixes: ")
kw.roachKeys.WriteDebug(dst, rows)
Expand All @@ -630,6 +677,8 @@ func (kw *cockroachKeyWriter) WriteDebug(dst io.Writer, rows int) {
fmt.Fprint(dst, "untyped suffixes: ")
kw.untypedVersions.WriteDebug(dst, rows)
fmt.Fprintln(dst)
fmt.Fprint(dst, "suffix types: ")
fmt.Fprintln(dst, kw.suffixTypes.String())
}

func (kw *cockroachKeyWriter) NumColumns() int {
Expand Down Expand Up @@ -665,12 +714,17 @@ func (kw *cockroachKeyWriter) Finish(
}
}

func (kw *cockroachKeyWriter) FinishHeader(buf []byte) {
buf[0] = byte(kw.suffixTypes)
}

type cockroachKeySeeker struct {
roachKeys colblk.PrefixBytes
roachKeyChanged colblk.Bitmap
mvccWallTimes colblk.UnsafeUints
mvccLogical colblk.UnsafeUints
untypedVersions colblk.RawBytes
suffixTypes suffixTypes
}

// Assert that the cockroachKeySeeker fits inside KeySeekerMetadata.
Expand All @@ -685,6 +739,7 @@ func (ks *cockroachKeySeeker) init(d *colblk.DataBlockDecoder) {
ks.mvccWallTimes = bd.Uints(cockroachColMVCCWallTime)
ks.mvccLogical = bd.Uints(cockroachColMVCCLogical)
ks.untypedVersions = bd.RawBytes(cockroachColUntypedVersion)
ks.suffixTypes = suffixTypes(d.KeySchemaHeader(1)[0])
}

// IsLowerBound is part of the KeySeeker interface.
Expand Down
2 changes: 1 addition & 1 deletion sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type Metadata [MetadataSize]byte

// MetadataSize is the size of the metadata. The value is chosen to fit a
// colblk.DataBlockDecoder and a CockroachDB colblk.KeySeeker.
const MetadataSize = 328
const MetadataSize = 336

// Assert that MetadataSize is a multiple of 8. This is necessary to keep the
// block data buffer aligned.
Expand Down
8 changes: 4 additions & 4 deletions sstable/block/testdata/flush_governor
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ should not flush
init target-block-size=1000 size-class-aware-threshold=60 size-classes=(820, 1020, 1320, 1820)
----
low watermark: 600
high watermark: 1460
boundaries: [660 960]
high watermark: 1452
boundaries: [652 952]

# Should not flush when the "after" block fits in the same size class.
should-flush size-before=600 size-after=650
Expand Down Expand Up @@ -89,8 +89,8 @@ boundaries: []
init target-block-size=32768 jemalloc-size-classes
----
low watermark: 19661
high watermark: 40600
boundaries: [20120 24216 28312 32408]
high watermark: 40592
boundaries: [20112 24208 28304 32400]

# We should flush to avoid exceeding the boundary.
should-flush size-before=32000 size-after=32766
Expand Down
6 changes: 3 additions & 3 deletions sstable/colblk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func (h Header) Encode(buf []byte) {
// blockHeaderSize returns the size of the block header, including column
// headers, for a block with the specified number of columns and optionally a
// custom header size.
func blockHeaderSize(cols int, customHeaderSize int) uint32 {
func blockHeaderSize(cols int, customHeaderSize uint32) uint32 {
// Each column has a 1-byte DataType and a 4-byte offset into the block.
return uint32(blockHeaderBaseSize + cols*columnHeaderSize + customHeaderSize)
return uint32(blockHeaderBaseSize+cols*columnHeaderSize) + customHeaderSize
}

// DecodeHeader reads the block header from the provided serialized columnar
Expand Down Expand Up @@ -217,7 +217,7 @@ func (e *blockEncoder) reset() {

// init initializes the block encoder with a buffer of the specified size and
// header.
func (e *blockEncoder) init(size int, h Header, customHeaderSize int) {
func (e *blockEncoder) init(size int, h Header, customHeaderSize uint32) {
if cap(e.buf) < size {
e.buf = crbytes.AllocAligned(size)
} else {
Expand Down
33 changes: 24 additions & 9 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
// attributes inlined within the data block. For inlined-values, the
// user-defined value columns would be implicitly null.
type KeySchema struct {
Name string
Name string
// KeySchema implementations can optionally make use a fixed-sized custom
// header inside each block.
HeaderSize uint32
ColumnTypes []DataType
NewKeyWriter func() KeyWriter

Expand Down Expand Up @@ -63,7 +66,7 @@ type KeySeekerMetadata [KeySeekerMetadataSize]byte

// KeySeekerMetadataSize is chosen to fit the CockroachDB key seeker
// implementation.
const KeySeekerMetadataSize = 168
const KeySeekerMetadataSize = 176

// A KeyWriter maintains ColumnWriters for a data block for writing user keys
// into the database-specific key schema. Users may define their own key schema
Expand All @@ -90,6 +93,8 @@ type KeyWriter interface {
// MaterializeKey appends the zero-indexed row'th key written to dst,
// returning the result.
MaterializeKey(dst []byte, row int) []byte
// FinishHeader serializes an internal header of exactly KeySchema.HeaderSize bytes.
FinishHeader(dst []byte)
}

// KeyComparison holds information about a key and its comparison to another a
Expand Down Expand Up @@ -177,6 +182,7 @@ var defaultSchemaColumnTypes = []DataType{
func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
return KeySchema{
Name: fmt.Sprintf("DefaultKeySchema(%s,%d)", comparer.Name, prefixBundleSize),
HeaderSize: 0,
ColumnTypes: defaultSchemaColumnTypes,
NewKeyWriter: func() KeyWriter {
kw := &defaultKeyWriter{comparer: comparer}
Expand Down Expand Up @@ -300,6 +306,8 @@ func (w *defaultKeyWriter) Size(rows int, offset uint32) uint32 {
return offset
}

func (w *defaultKeyWriter) FinishHeader([]byte) {}

func (w *defaultKeyWriter) Finish(col, rows int, offset uint32, buf []byte) (nextOffset uint32) {
switch col {
case defaultKeySchemaColumnPrefix:
Expand Down Expand Up @@ -464,6 +472,7 @@ const (
// contained within the block. This is used by iterators to avoid the need to
// grow key buffers while iterating over the block, ensuring that the key buffer
// is always sufficiently large.
// This is serialized immediately after the KeySchema specific header.
const dataBlockCustomHeaderSize = 4

// Init initializes the data block writer.
Expand Down Expand Up @@ -571,7 +580,7 @@ func (w *DataBlockEncoder) Rows() int {

// Size returns the size of the current pending data block.
func (w *DataBlockEncoder) Size() int {
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize)
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize+w.Schema.HeaderSize)
off = w.KeyWriter.Size(w.rows, off)
off = w.trailers.Size(w.rows, off)
off = w.prefixSame.InvertedSize(w.rows, off)
Expand Down Expand Up @@ -607,11 +616,12 @@ func (w *DataBlockEncoder) Finish(rows, size int) (finished []byte, lastKey base
// to represent when the prefix changes.
w.prefixSame.Invert(rows)

w.enc.init(size, h, dataBlockCustomHeaderSize)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))
w.enc.init(size, h, dataBlockCustomHeaderSize+w.Schema.HeaderSize)

// Write the key schema custom header.
w.KeyWriter.FinishHeader(w.enc.data()[:w.Schema.HeaderSize])
// Write the max key length in the data block custom header.
binary.LittleEndian.PutUint32(w.enc.data()[w.Schema.HeaderSize:w.Schema.HeaderSize+dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))
w.enc.encode(rows, w.KeyWriter)
w.enc.encode(rows, &w.trailers)
w.enc.encode(rows, &w.prefixSame)
Expand Down Expand Up @@ -864,18 +874,23 @@ func (d *DataBlockDecoder) PrefixChanged() Bitmap {
return d.prefixChanged
}

// KeySchemaHeader returns the KeySchema-specific header of fixed size.
func (d *DataBlockDecoder) KeySchemaHeader(schemaHeaderSize uint32) []byte {
return d.d.data[:schemaHeaderSize]
}

// Init initializes the data block reader with the given serialized data block.
func (d *DataBlockDecoder) Init(schema *KeySchema, data []byte) {
if uintptr(unsafe.Pointer(unsafe.SliceData(data)))&7 != 0 {
panic("data buffer not 8-byte aligned")
}
d.d.Init(data, dataBlockCustomHeaderSize)
d.d.Init(data, dataBlockCustomHeaderSize+schema.HeaderSize)
d.trailers = d.d.Uints(len(schema.ColumnTypes) + dataBlockColumnTrailer)
d.prefixChanged = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnPrefixChanged)
d.values = d.d.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
d.isValueExternal = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsValueExternal)
d.isObsolete = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsObsolete)
d.maximumKeyLength = binary.LittleEndian.Uint32(data[:dataBlockCustomHeaderSize])
d.maximumKeyLength = binary.LittleEndian.Uint32(data[schema.HeaderSize:])
}

// Describe descirbes the binary format of the data block, assuming f.Offset()
Expand Down
2 changes: 1 addition & 1 deletion testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 2.1KB
Compression types: snappy: 3
Block cache: 2 entries (768B) hit rate: 0.0%
Block cache: 2 entries (784B) hit rate: 0.0%
Table cache: 0 entries (0B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 569B
Compression types: snappy: 1
Block cache: 3 entries (1.0KB) hit rate: 18.2%
Block cache: 3 entries (1.1KB) hit rate: 18.2%
Table cache: 1 entries (760B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down
8 changes: 4 additions & 4 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 0.0%
Block cache: 2 entries (716B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -132,7 +132,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -176,7 +176,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -217,7 +217,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 1 entries (760B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down

0 comments on commit 410a16c

Please sign in to comment.