Skip to content

Commit

Permalink
sstable: move empty block property logic to encoder/decoder
Browse files Browse the repository at this point in the history
We omit block properties that are empty when we encode. When we
decode, we have to handle any missing properties as empty. The code
that does this is hard to follow. This change moves this logic into
the encoder/decoder, which makes things much cleaner.
  • Loading branch information
RaduBerinde committed Apr 11, 2024
1 parent af7e51a commit 978bd26
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 128 deletions.
141 changes: 82 additions & 59 deletions sstable/block_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"unsafe"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/rangekey"
)

Expand Down Expand Up @@ -524,11 +525,28 @@ func (b *BlockIntervalFilter) SetInterval(lower, upper uint64) {
b.filterInterval = interval{lower: lower, upper: upper}
}

// When encoding block properties for each block, we cannot afford to encode
// the name. Instead, the name is mapped to a shortID, in the scope of that
// sstable, and the shortID is encoded. Since we use a uint8, there is a limit
// of 256 block property collectors per sstable.
type shortID uint8
// When encoding block properties for each block, we cannot afford to encode the
// name. Instead, the name is mapped to a shortID, in the scope of that sstable,
// and the shortID is encoded as a single byte (which imposes a limit of of 256
// block property collectors per sstable).
// Note that the in-memory type is int16 to avoid overflows (e.g. in loops) and
// to allow special values like -1 in code.
type shortID int16

const invalidShortID shortID = -1
const maxShortID shortID = math.MaxUint8
const maxPropertyCollectors = int(maxShortID) + 1

func (id shortID) IsValid() bool {
return id >= 0 && id <= maxShortID
}

func (id shortID) ToByte() byte {
if invariants.Enabled && !id.IsValid() {
panic(fmt.Sprintf("inavlid id %d", id))
}
return byte(id)
}

type blockPropertiesEncoder struct {
propsBuf []byte
Expand All @@ -544,6 +562,11 @@ func (e *blockPropertiesEncoder) resetProps() {
}

func (e *blockPropertiesEncoder) addProp(id shortID, scratch []byte) {
if len(scratch) == 0 {
// We omit empty properties. The decoder will know that any missing IDs had
// empty values.
return
}
const lenID = 1
lenProp := uvarintLen(uint32(len(scratch)))
n := lenID + lenProp + len(scratch)
Expand All @@ -558,7 +581,7 @@ func (e *blockPropertiesEncoder) addProp(id shortID, scratch []byte) {
}
pos := len(e.propsBuf)
b := e.propsBuf[pos : pos+lenID]
b[0] = byte(id)
b[0] = id.ToByte()
pos += lenID
b = e.propsBuf[pos : pos+lenProp]
n = binary.PutUvarint(b, uint64(len(scratch)))
Expand All @@ -582,16 +605,41 @@ func (e *blockPropertiesEncoder) props() []byte {

type blockPropertiesDecoder struct {
props []byte

// numCollectedProps is the number of collectors that were used when writing
// these properties. The encoded properties contain values for shortIDs 0
// through numCollectedProps-1, in order (with empty properties omitted).
numCollectedProps int
nextID shortID
}

func makeBlockPropertiesDecoder(numCollectedProps int, propsBuf []byte) blockPropertiesDecoder {
return blockPropertiesDecoder{
props: propsBuf,
numCollectedProps: numCollectedProps,
}
}

func (d *blockPropertiesDecoder) done() bool {
return len(d.props) == 0
func (d *blockPropertiesDecoder) Done() bool {
return int(d.nextID) >= d.numCollectedProps
}

// REQUIRES: !done()
func (d *blockPropertiesDecoder) next() (id shortID, prop []byte, err error) {
// Next returns the property for each shortID between 0 and numCollectedProps-1, in order.
// Note that some properties might be empty.
// REQUIRES: !Done()
func (d *blockPropertiesDecoder) Next() (id shortID, prop []byte, err error) {
id = d.nextID
d.nextID++

if len(d.props) == 0 || shortID(d.props[0]) != id {
if invariants.Enabled && len(d.props) > 0 && shortID(d.props[0]) < id {
panic("shortIDs are not in order")
}
// This property was omitted because it was empty.
return id, nil, nil
}

const lenID = 1
id = shortID(d.props[0])
propLen, m := binary.Uvarint(d.props[lenID:])
n := lenID + m
if m <= 0 || propLen == 0 || (n+int(propLen)) > len(d.props) {
Expand Down Expand Up @@ -811,64 +859,39 @@ const (
blockMaybeExcluded
)

func (f *BlockPropertiesFilterer) intersects(props []byte) (ret intersectsResult, err error) {
i := 0
decoder := blockPropertiesDecoder{props: props}
ret = blockIntersects
for i < len(f.shortIDToFiltersIndex) {
var id int
var prop []byte
if !decoder.done() {
var shortID shortID
var err error
shortID, prop, err = decoder.next()
if err != nil {
return ret, err
}
id = int(shortID)
} else {
id = math.MaxUint8 + 1
}
for i < len(f.shortIDToFiltersIndex) && id > i {
// The property for this id is not encoded for this block, but there
// may still be a filter for this id.
if intersects, err := f.intersectsFilter(i, nil); err != nil {
return ret, err
} else if intersects == blockExcluded {
return blockExcluded, nil
} else if intersects == blockMaybeExcluded {
ret = blockMaybeExcluded
}
i++
}
if i >= len(f.shortIDToFiltersIndex) {
return ret, nil
}
// INVARIANT: id <= i. And since i is always incremented by 1, id==i.
if id != i {
panic(fmt.Sprintf("%d != %d", id, i))
func (f *BlockPropertiesFilterer) intersects(props []byte) (intersectsResult, error) {
decoder := makeBlockPropertiesDecoder(len(f.shortIDToFiltersIndex), props)
ret := blockIntersects
for !decoder.Done() {
id, prop, err := decoder.Next()
if err != nil {
return ret, err
}
if intersects, err := f.intersectsFilter(i, prop); err != nil {
intersects, err := f.intersectsFilter(id, prop)
if err != nil {
return ret, err
} else if intersects == blockExcluded {
}
if intersects == blockExcluded {
return blockExcluded, nil
} else if intersects == blockMaybeExcluded {
}
if intersects == blockMaybeExcluded {
ret = blockMaybeExcluded
}
i++
}
// ret == blockIntersects || ret == blockMaybeExcluded
// ret is either blockIntersects or blockMaybeExcluded.
return ret, nil
}

func (f *BlockPropertiesFilterer) intersectsFilter(i int, prop []byte) (intersectsResult, error) {
func (f *BlockPropertiesFilterer) intersectsFilter(
id shortID, prop []byte,
) (intersectsResult, error) {
var intersects bool
var err error
if f.shortIDToFiltersIndex[i] >= 0 {
if len(f.syntheticSuffix) == 0 {
intersects, err = f.filters[f.shortIDToFiltersIndex[i]].Intersects(prop)
if filterIdx := f.shortIDToFiltersIndex[id]; filterIdx >= 0 {
if !f.syntheticSuffix.IsSet() {
intersects, err = f.filters[filterIdx].Intersects(prop)
} else {
intersects, err = f.filters[f.shortIDToFiltersIndex[i]].SyntheticSuffixIntersects(prop, f.syntheticSuffix)
intersects, err = f.filters[filterIdx].SyntheticSuffixIntersects(prop, f.syntheticSuffix)
}
if err != nil {
return blockIntersects, err
Expand All @@ -877,15 +900,15 @@ func (f *BlockPropertiesFilterer) intersectsFilter(i int, prop []byte) (intersec
return blockExcluded, nil
}
}
if i == f.boundLimitedShortID {
if int(id) == f.boundLimitedShortID {
// The bound-limited filter uses this id.
//
// The bound-limited filter only applies within a keyspan interval. We
// expect the Intersects call to be cheaper than bounds checks. If
// Intersects determines that there is no intersection, we return
// `blockMaybeExcluded` if no other bpf unconditionally excludes the
// block.
if len(f.syntheticSuffix) == 0 {
if !f.syntheticSuffix.IsSet() {
intersects, err = f.boundLimitedFilter.Intersects(prop)
} else {
intersects, err = f.boundLimitedFilter.SyntheticSuffixIntersects(prop, f.syntheticSuffix)
Expand Down
52 changes: 30 additions & 22 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,25 @@ func TestBlockPropertiesEncoderDecoder(t *testing.T) {
props1 := encoder.props()
unsafeProps := encoder.unsafeProps()
require.True(t, bytes.Equal(props1, unsafeProps), "%x != %x", props1, unsafeProps)
decodeProps1 := func() {
decoder := blockPropertiesDecoder{props: props1}
require.False(t, decoder.done())
id, prop, err := decoder.next()
require.NoError(t, err)
require.Equal(t, shortID(1), id)
require.Equal(t, string(prop), "foo")
require.False(t, decoder.done())
id, prop, err = decoder.next()

expect := func(decoder *blockPropertiesDecoder, expectedID shortID, expectedVal string) {
t.Helper()
require.False(t, decoder.Done())
id, prop, err := decoder.Next()
require.NoError(t, err)
require.Equal(t, shortID(10), id)
require.Equal(t, string(prop), "cockroach")
require.True(t, decoder.done())
require.Equal(t, expectedID, id)
require.Equal(t, string(prop), expectedVal)
}

decodeProps1 := func() {
decoder := makeBlockPropertiesDecoder(11, props1)
expect(&decoder, 0, "")
expect(&decoder, 1, "foo")
for i := shortID(2); i < 10; i++ {
expect(&decoder, i, "")
}
expect(&decoder, 10, "cockroach")
require.True(t, decoder.Done())
}
decodeProps1()

Expand All @@ -333,13 +339,12 @@ func TestBlockPropertiesEncoderDecoder(t *testing.T) {
// Safe props should still decode.
decodeProps1()
// Decode props2
decoder := blockPropertiesDecoder{props: props2}
require.False(t, decoder.done())
id, prop, err := decoder.next()
require.NoError(t, err)
require.Equal(t, shortID(10), id)
require.Equal(t, string(prop), "bar")
require.True(t, decoder.done())
decoder := makeBlockPropertiesDecoder(11, props2)
for i := shortID(0); i < 10; i++ {
expect(&decoder, i, "")
}
expect(&decoder, 10, "bar")
require.True(t, decoder.Done())
}

// filterWithTrueForEmptyProp is a wrapper for BlockPropertyFilter that
Expand Down Expand Up @@ -1354,13 +1359,16 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
defer bh.Release()
var sb strings.Builder
decodeProps := func(props []byte, indent string) error {
d := blockPropertiesDecoder{props: props}
d := makeBlockPropertiesDecoder(11, props)
var lines []string
for !d.done() {
id, prop, err := d.next()
for !d.Done() {
id, prop, err := d.Next()
if err != nil {
return err
}
if prop == nil {
continue
}
var i interval
if err := i.decode(prop); err != nil {
return err
Expand Down
22 changes: 16 additions & 6 deletions sstable/suffix_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,24 @@ func rewriteDataBlocksToWriter(
return err
}

var decoder blockPropertiesDecoder
// oldShortIDs maps the shortID for the block property collector in the old
// blocks to the shortID in the new blocks. Initialized once for the sstable.
var oldShortIDs []shortID
// oldProps is the property value in an old block, indexed by the shortID in
// the new block. Slice is reused for each block.
var oldProps [][]byte
if len(w.blockPropCollectors) > 0 {
oldProps = make([][]byte, len(w.blockPropCollectors))
oldShortIDs = make([]shortID, math.MaxUint8)
for i := range oldShortIDs {
oldShortIDs[i] = invalidShortID
}
for i, p := range w.blockPropCollectors {
if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
was, is := shortID(byte(prop[0])), shortID(i)
was, is := shortID(prop[0]), shortID(i)
oldShortIDs[was] = is
} else {
return errors.Errorf("sstable does not contain property %s", p.Name())
}
}
}
Expand All @@ -346,13 +354,15 @@ func rewriteDataBlocksToWriter(
for i := range oldProps {
oldProps[i] = nil
}
decoder.props = data[i].Props
for !decoder.done() {
id, val, err := decoder.next()
decoder := makeBlockPropertiesDecoder(len(oldProps), data[i].Props)
for !decoder.Done() {
id, val, err := decoder.Next()
if err != nil {
return err
}
oldProps[oldShortIDs[id]] = val
if oldShortIDs[id].IsValid() {
oldProps[oldShortIDs[id]] = val
}
}

for i, p := range w.blockPropCollectors {
Expand Down
18 changes: 9 additions & 9 deletions sstable/suffix_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func TestRewriteSuffixProps(t *testing.T) {
if byBlocks {
_, rewriteFormat, err := rewriteKeySuffixesInBlocks(
r, rewrittenSST, rwOpts, from, to, 8)
require.NoError(t, err)
// rewriteFormat is equal to the original format, since
// rwOpts.TableFormat is ignored.
require.Equal(t, wOpts.TableFormat, rewriteFormat)
require.NoError(t, err)
adjustPropsForEffectiveFormat(rewriteFormat)
} else {
_, err := RewriteKeySuffixesViaWriter(r, rewrittenSST, rwOpts, from, to)
Expand Down Expand Up @@ -114,25 +114,25 @@ func TestRewriteSuffixProps(t *testing.T) {
ival := interval{}
for i := range layout.Data {
oldProps := make([][]byte, len(wOpts.BlockPropertyCollectors))
oldDecoder := blockPropertiesDecoder{layout.Data[i].Props}
for !oldDecoder.done() {
id, val, err := oldDecoder.next()
oldDecoder := makeBlockPropertiesDecoder(len(oldProps), layout.Data[i].Props)
for !oldDecoder.Done() {
id, val, err := oldDecoder.Next()
require.NoError(t, err)
oldProps[id] = val
}
newProps := make([][]byte, len(rwOpts.BlockPropertyCollectors))
newDecoder := blockPropertiesDecoder{newLayout.Data[i].Props}
for !newDecoder.done() {
id, val, err := newDecoder.next()
newDecoder := makeBlockPropertiesDecoder(len(newProps), newLayout.Data[i].Props)
for !newDecoder.Done() {
id, val, err := newDecoder.Next()
require.NoError(t, err)
if int(id) < len(newProps) {
newProps[id] = val
}
}
require.Equal(t, oldProps[0], newProps[1])
ival.decode(newProps[0])
require.NoError(t, ival.decode(newProps[0]))
require.Equal(t, interval{646, 647}, ival)
ival.decode(newProps[2])
require.NoError(t, ival.decode(newProps[2]))
require.Equal(t, interval{46, 47}, ival)
}
})
Expand Down
Loading

0 comments on commit 978bd26

Please sign in to comment.