Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colblk: add a KeySchema-specific header #4084

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 69 additions & 14 deletions internal/crdbtest/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io"
"math/rand/v2"
"slices"
"strings"
"time"
"unsafe"

Expand Down Expand Up @@ -517,20 +518,16 @@ const (
)

var KeySchema = colblk.KeySchema{
Name: "crdb1",
Name: "crdb1",
HeaderSize: 1,
ColumnTypes: []colblk.DataType{
cockroachColRoachKey: colblk.DataTypePrefixBytes,
cockroachColMVCCWallTime: colblk.DataTypeUint,
cockroachColMVCCLogical: colblk.DataTypeUint,
cockroachColUntypedVersion: colblk.DataTypeBytes,
},
NewKeyWriter: func() colblk.KeyWriter {
kw := &cockroachKeyWriter{}
kw.roachKeys.Init(16)
kw.wallTimes.Init()
kw.logicalTimes.InitWithDefault()
kw.untypedVersions.Init()
return kw
return makeCockroachKeyWriter()
},
InitKeySeekerMetadata: func(meta *colblk.KeySeekerMetadata, d *colblk.DataBlockDecoder) {
ks := (*cockroachKeySeeker)(unsafe.Pointer(meta))
Expand All @@ -541,14 +538,64 @@ var KeySchema = colblk.KeySchema{
},
}

// suffixTypes is a bitfield indicating what kind of suffixes are present in a
// block.
type suffixTypes uint8

const (
// hasMVCCSuffixes is set if there is at least one key with an MVCC suffix in
// the block.
hasMVCCSuffixes suffixTypes = (1 << iota)
// hasEmptySuffixes is set if there is at least one key with no suffix in the block.
hasEmptySuffixes
// hasNonMVCCSuffixes is set if there is at least one key with a non-empty,
// non-MVCC sufffix.
hasNonMVCCSuffixes
)

func (s suffixTypes) String() string {
var suffixes []string
if s&hasMVCCSuffixes != 0 {
suffixes = append(suffixes, "mvcc")
}
if s&hasEmptySuffixes != 0 {
suffixes = append(suffixes, "empty")
}
if s&hasEmptySuffixes != 0 {
suffixes = append(suffixes, "non-mvcc")
}
if len(suffixes) == 0 {
return "none"
}
return strings.Join(suffixes, ",")
}

type cockroachKeyWriter struct {
roachKeys colblk.PrefixBytesBuilder
wallTimes colblk.UintBuilder
logicalTimes colblk.UintBuilder
untypedVersions colblk.RawBytesBuilder
suffixTypes suffixTypes
prevSuffix []byte
}

func makeCockroachKeyWriter() *cockroachKeyWriter {
kw := &cockroachKeyWriter{}
kw.roachKeys.Init(16)
kw.wallTimes.Init()
kw.logicalTimes.InitWithDefault()
kw.untypedVersions.Init()
return kw
}

func (kw *cockroachKeyWriter) Reset() {
kw.roachKeys.Reset()
kw.wallTimes.Reset()
kw.logicalTimes.Reset()
kw.untypedVersions.Reset()
kw.suffixTypes = 0
}

func (kw *cockroachKeyWriter) ComparePrev(key []byte) colblk.KeyComparison {
var cmpv colblk.KeyComparison
cmpv.PrefixLen = int32(Split(key)) // TODO(jackson): Inline
Expand Down Expand Up @@ -596,6 +643,13 @@ func (kw *cockroachKeyWriter) WriteKey(
kw.logicalTimes.Set(row, uint64(logicalTime))
}
kw.untypedVersions.Put(untypedVersion)
if wallTime != 0 || logicalTime != 0 {
kw.suffixTypes |= hasMVCCSuffixes
} else if len(untypedVersion) == 0 {
kw.suffixTypes |= hasEmptySuffixes
} else {
kw.suffixTypes |= hasNonMVCCSuffixes
}
}

func (kw *cockroachKeyWriter) MaterializeKey(dst []byte, i int) []byte {
Expand All @@ -610,13 +664,6 @@ func (kw *cockroachKeyWriter) MaterializeKey(dst []byte, i int) []byte {
return AppendTimestamp(dst, kw.wallTimes.Get(i), uint32(kw.logicalTimes.Get(i)))
}

func (kw *cockroachKeyWriter) Reset() {
kw.roachKeys.Reset()
kw.wallTimes.Reset()
kw.logicalTimes.Reset()
kw.untypedVersions.Reset()
}

func (kw *cockroachKeyWriter) WriteDebug(dst io.Writer, rows int) {
fmt.Fprint(dst, "prefixes: ")
kw.roachKeys.WriteDebug(dst, rows)
Expand All @@ -630,6 +677,8 @@ func (kw *cockroachKeyWriter) WriteDebug(dst io.Writer, rows int) {
fmt.Fprint(dst, "untyped suffixes: ")
kw.untypedVersions.WriteDebug(dst, rows)
fmt.Fprintln(dst)
fmt.Fprint(dst, "suffix types: ")
fmt.Fprintln(dst, kw.suffixTypes.String())
}

func (kw *cockroachKeyWriter) NumColumns() int {
Expand Down Expand Up @@ -665,12 +714,17 @@ func (kw *cockroachKeyWriter) Finish(
}
}

func (kw *cockroachKeyWriter) FinishHeader(buf []byte) {
buf[0] = byte(kw.suffixTypes)
}

type cockroachKeySeeker struct {
roachKeys colblk.PrefixBytes
roachKeyChanged colblk.Bitmap
mvccWallTimes colblk.UnsafeUints
mvccLogical colblk.UnsafeUints
untypedVersions colblk.RawBytes
suffixTypes suffixTypes
}

// Assert that the cockroachKeySeeker fits inside KeySeekerMetadata.
Expand All @@ -685,6 +739,7 @@ func (ks *cockroachKeySeeker) init(d *colblk.DataBlockDecoder) {
ks.mvccWallTimes = bd.Uints(cockroachColMVCCWallTime)
ks.mvccLogical = bd.Uints(cockroachColMVCCLogical)
ks.untypedVersions = bd.RawBytes(cockroachColUntypedVersion)
ks.suffixTypes = suffixTypes(d.KeySchemaHeader(1)[0])
}

// IsLowerBound is part of the KeySeeker interface.
Expand Down
2 changes: 1 addition & 1 deletion sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type Metadata [MetadataSize]byte

// MetadataSize is the size of the metadata. The value is chosen to fit a
// colblk.DataBlockDecoder and a CockroachDB colblk.KeySeeker.
const MetadataSize = 328
const MetadataSize = 336

// Assert that MetadataSize is a multiple of 8. This is necessary to keep the
// block data buffer aligned.
Expand Down
8 changes: 4 additions & 4 deletions sstable/block/testdata/flush_governor
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ should not flush
init target-block-size=1000 size-class-aware-threshold=60 size-classes=(820, 1020, 1320, 1820)
----
low watermark: 600
high watermark: 1460
boundaries: [660 960]
high watermark: 1452
boundaries: [652 952]

# Should not flush when the "after" block fits in the same size class.
should-flush size-before=600 size-after=650
Expand Down Expand Up @@ -89,8 +89,8 @@ boundaries: []
init target-block-size=32768 jemalloc-size-classes
----
low watermark: 19661
high watermark: 40600
boundaries: [20120 24216 28312 32408]
high watermark: 40592
boundaries: [20112 24208 28304 32400]

# We should flush to avoid exceeding the boundary.
should-flush size-before=32000 size-after=32766
Expand Down
6 changes: 3 additions & 3 deletions sstable/colblk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func (h Header) Encode(buf []byte) {
// blockHeaderSize returns the size of the block header, including column
// headers, for a block with the specified number of columns and optionally a
// custom header size.
func blockHeaderSize(cols int, customHeaderSize int) uint32 {
func blockHeaderSize(cols int, customHeaderSize uint32) uint32 {
// Each column has a 1-byte DataType and a 4-byte offset into the block.
return uint32(blockHeaderBaseSize + cols*columnHeaderSize + customHeaderSize)
return uint32(blockHeaderBaseSize+cols*columnHeaderSize) + customHeaderSize
}

// DecodeHeader reads the block header from the provided serialized columnar
Expand Down Expand Up @@ -217,7 +217,7 @@ func (e *blockEncoder) reset() {

// init initializes the block encoder with a buffer of the specified size and
// header.
func (e *blockEncoder) init(size int, h Header, customHeaderSize int) {
func (e *blockEncoder) init(size int, h Header, customHeaderSize uint32) {
if cap(e.buf) < size {
e.buf = crbytes.AllocAligned(size)
} else {
Expand Down
33 changes: 24 additions & 9 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
// attributes inlined within the data block. For inlined-values, the
// user-defined value columns would be implicitly null.
type KeySchema struct {
Name string
Name string
// KeySchema implementations can optionally make use a fixed-sized custom
// header inside each block.
HeaderSize uint32
ColumnTypes []DataType
NewKeyWriter func() KeyWriter

Expand Down Expand Up @@ -63,7 +66,7 @@ type KeySeekerMetadata [KeySeekerMetadataSize]byte

// KeySeekerMetadataSize is chosen to fit the CockroachDB key seeker
// implementation.
const KeySeekerMetadataSize = 168
const KeySeekerMetadataSize = 176

// A KeyWriter maintains ColumnWriters for a data block for writing user keys
// into the database-specific key schema. Users may define their own key schema
Expand All @@ -90,6 +93,8 @@ type KeyWriter interface {
// MaterializeKey appends the zero-indexed row'th key written to dst,
// returning the result.
MaterializeKey(dst []byte, row int) []byte
// FinishHeader serializes an internal header of exactly KeySchema.HeaderSize bytes.
FinishHeader(dst []byte)
}

// KeyComparison holds information about a key and its comparison to another a
Expand Down Expand Up @@ -177,6 +182,7 @@ var defaultSchemaColumnTypes = []DataType{
func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
return KeySchema{
Name: fmt.Sprintf("DefaultKeySchema(%s,%d)", comparer.Name, prefixBundleSize),
HeaderSize: 0,
ColumnTypes: defaultSchemaColumnTypes,
NewKeyWriter: func() KeyWriter {
kw := &defaultKeyWriter{comparer: comparer}
Expand Down Expand Up @@ -300,6 +306,8 @@ func (w *defaultKeyWriter) Size(rows int, offset uint32) uint32 {
return offset
}

func (w *defaultKeyWriter) FinishHeader([]byte) {}

func (w *defaultKeyWriter) Finish(col, rows int, offset uint32, buf []byte) (nextOffset uint32) {
switch col {
case defaultKeySchemaColumnPrefix:
Expand Down Expand Up @@ -464,6 +472,7 @@ const (
// contained within the block. This is used by iterators to avoid the need to
// grow key buffers while iterating over the block, ensuring that the key buffer
// is always sufficiently large.
// This is serialized immediately after the KeySchema specific header.
const dataBlockCustomHeaderSize = 4

// Init initializes the data block writer.
Expand Down Expand Up @@ -571,7 +580,7 @@ func (w *DataBlockEncoder) Rows() int {

// Size returns the size of the current pending data block.
func (w *DataBlockEncoder) Size() int {
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize)
off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize+w.Schema.HeaderSize)
off = w.KeyWriter.Size(w.rows, off)
off = w.trailers.Size(w.rows, off)
off = w.prefixSame.InvertedSize(w.rows, off)
Expand Down Expand Up @@ -607,11 +616,12 @@ func (w *DataBlockEncoder) Finish(rows, size int) (finished []byte, lastKey base
// to represent when the prefix changes.
w.prefixSame.Invert(rows)

w.enc.init(size, h, dataBlockCustomHeaderSize)

// Write the max key length in the custom header.
binary.LittleEndian.PutUint32(w.enc.data()[:dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))
w.enc.init(size, h, dataBlockCustomHeaderSize+w.Schema.HeaderSize)

// Write the key schema custom header.
w.KeyWriter.FinishHeader(w.enc.data()[:w.Schema.HeaderSize])
// Write the max key length in the data block custom header.
binary.LittleEndian.PutUint32(w.enc.data()[w.Schema.HeaderSize:w.Schema.HeaderSize+dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))
w.enc.encode(rows, w.KeyWriter)
w.enc.encode(rows, &w.trailers)
w.enc.encode(rows, &w.prefixSame)
Expand Down Expand Up @@ -864,18 +874,23 @@ func (d *DataBlockDecoder) PrefixChanged() Bitmap {
return d.prefixChanged
}

// KeySchemaHeader returns the KeySchema-specific header of fixed size.
func (d *DataBlockDecoder) KeySchemaHeader(schemaHeaderSize uint32) []byte {
return d.d.data[:schemaHeaderSize]
}

// Init initializes the data block reader with the given serialized data block.
func (d *DataBlockDecoder) Init(schema *KeySchema, data []byte) {
if uintptr(unsafe.Pointer(unsafe.SliceData(data)))&7 != 0 {
panic("data buffer not 8-byte aligned")
}
d.d.Init(data, dataBlockCustomHeaderSize)
d.d.Init(data, dataBlockCustomHeaderSize+schema.HeaderSize)
d.trailers = d.d.Uints(len(schema.ColumnTypes) + dataBlockColumnTrailer)
d.prefixChanged = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnPrefixChanged)
d.values = d.d.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
d.isValueExternal = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsValueExternal)
d.isObsolete = d.d.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsObsolete)
d.maximumKeyLength = binary.LittleEndian.Uint32(data[:dataBlockCustomHeaderSize])
d.maximumKeyLength = binary.LittleEndian.Uint32(data[schema.HeaderSize:])
}

// Describe descirbes the binary format of the data block, assuming f.Offset()
Expand Down
2 changes: 1 addition & 1 deletion testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 2.1KB
Compression types: snappy: 3
Block cache: 2 entries (768B) hit rate: 0.0%
Block cache: 2 entries (784B) hit rate: 0.0%
Table cache: 0 entries (0B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 569B
Compression types: snappy: 1
Block cache: 3 entries (1.0KB) hit rate: 18.2%
Block cache: 3 entries (1.1KB) hit rate: 18.2%
Table cache: 1 entries (760B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down
8 changes: 4 additions & 4 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 0.0%
Block cache: 2 entries (716B) hit rate: 0.0%
Table cache: 1 entries (760B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -132,7 +132,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -176,7 +176,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down Expand Up @@ -217,7 +217,7 @@ Backing tables: 0 (0B)
Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 2 entries (700B) hit rate: 33.3%
Block cache: 2 entries (716B) hit rate: 33.3%
Table cache: 1 entries (760B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Expand Down
Loading