Skip to content

Commit

Permalink
Configurable initial and max retained batch sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored and RaduBerinde committed Apr 11, 2024
1 parent 978bd26 commit d253ff7
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 20 deletions.
63 changes: 54 additions & 9 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
batchInitialSize = 1 << 10 // 1 KB
batchMaxRetainedSize = 1 << 20 // 1 MB
invalidBatchCount = 1<<32 - 1
maxVarintLen32 = 5
invalidBatchCount = 1<<32 - 1
maxVarintLen32 = 5

defaultBatchInitialSize = 1 << 10 // 1 KB
defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
)

// ErrNotIndexed means that a read operation on a batch failed because the
Expand Down Expand Up @@ -271,6 +272,7 @@ type batchInternal struct {
cmp Compare
formatKey base.FormatKey
abbreviatedKey AbbreviatedKey
opts batchOptions

// An upper bound on required space to add this batch to a memtable.
// Note that although batches are limited to 4 GiB in size, that limit
Expand Down Expand Up @@ -440,14 +442,18 @@ var indexedBatchPool = sync.Pool{
},
}

func newBatch(db *DB) *Batch {
func newBatch(db *DB, opts ...BatchOption) *Batch {
b := batchPool.Get().(*Batch)
b.db = db
b.opts.ensureDefaults()
for _, opt := range opts {
opt(&b.opts)
}
return b
}

func newBatchWithSize(db *DB, size int) *Batch {
b := newBatch(db)
func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
b := newBatch(db, opts...)
if cap(b.data) < size {
b.data = rawalloc.New(0, size)
}
Expand All @@ -462,6 +468,7 @@ func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
i.batch.db = db
i.batch.index = &i.index
i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey)
i.batch.opts.ensureDefaults()
return &i.batch
}

Expand Down Expand Up @@ -1510,7 +1517,8 @@ func (b *Batch) Indexed() bool {
// init ensures that the batch data slice is initialized to meet the
// minimum required size and allocates space for the batch header.
func (b *Batch) init(size int) {
n := batchInitialSize
b.opts.ensureDefaults()
n := b.opts.initialSizeBytes
for n < size {
n *= 2
}
Expand Down Expand Up @@ -1547,12 +1555,13 @@ func (b *Batch) reset() {
cmp: b.cmp,
formatKey: b.formatKey,
abbreviatedKey: b.abbreviatedKey,
opts: b.opts,
index: b.index,
db: b.db,
}
b.applied.Store(false)
if b.data != nil {
if cap(b.data) > batchMaxRetainedSize {
if cap(b.data) > b.opts.maxRetainedSizeBytes {
// If the capacity of the buffer is larger than our maximum
// retention size, don't re-use it. Let it be GC-ed instead.
// This prevents the memory from an unusually large batch from
Expand Down Expand Up @@ -2402,6 +2411,42 @@ func (i flushFlushableBatchIter) valueSize() uint64 {
return length
}

// batchOptions holds the parameters to configure batch.
type batchOptions struct {
initialSizeBytes int
maxRetainedSizeBytes int
}

// ensureDefaults creates batch options with default values.
func (o *batchOptions) ensureDefaults() {
if o.initialSizeBytes <= 0 {
o.initialSizeBytes = defaultBatchInitialSize
}
if o.maxRetainedSizeBytes <= 0 {
o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
}
}

// BatchOption allows customizing the batch.
type BatchOption func(*batchOptions)

// WithInitialSizeBytes sets a custom initial size for the batch. Defaults
// to 1KB.
func WithInitialSizeBytes(s int) BatchOption {
return func(opts *batchOptions) {
opts.initialSizeBytes = s
}
}

// WithMaxRetainedSizeBytes sets a custom max size for the batch to be
// re-used. Any batch which exceeds the max retained size would be GC-ed.
// Defaults to 1MB.
func WithMaxRetainedSizeBytes(s int) BatchOption {
return func(opts *batchOptions) {
opts.maxRetainedSizeBytes = s
}
}

// batchSort returns iterators for the sorted contents of the batch. It is
// intended for testing use only. The batch.Sort dance is done to prevent
// exposing this method in the public pebble interface.
Expand Down
61 changes: 54 additions & 7 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestBatch(t *testing.T) {
testBatch(t, 0)
testBatch(t, batchInitialSize)
testBatch(t, defaultBatchInitialSize)
}

func testBatch(t *testing.T, size int) {
Expand Down Expand Up @@ -206,9 +206,9 @@ func TestBatchPreAlloc(t *testing.T) {
size int
exp int
}{
{0, batchInitialSize},
{batchInitialSize, batchInitialSize},
{2 * batchInitialSize, 2 * batchInitialSize},
{0, defaultBatchInitialSize},
{defaultBatchInitialSize, defaultBatchInitialSize},
{2 * defaultBatchInitialSize, 2 * defaultBatchInitialSize},
}
for _, c := range cases {
b := newBatchWithSize(nil, c.size)
Expand Down Expand Up @@ -257,11 +257,12 @@ func TestBatchLen(t *testing.T) {

func TestBatchEmpty(t *testing.T) {
testBatchEmpty(t, 0)
testBatchEmpty(t, batchInitialSize)
testBatchEmpty(t, defaultBatchInitialSize)
testBatchEmpty(t, 0, WithInitialSizeBytes(2<<10), WithMaxRetainedSizeBytes(2<<20))
}

func testBatchEmpty(t *testing.T, size int) {
b := newBatchWithSize(nil, size)
func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) {
b := newBatchWithSize(nil, size, opts...)
require.True(t, b.Empty())

ops := []func(*Batch) error{
Expand Down Expand Up @@ -404,6 +405,8 @@ func TestBatchReset(t *testing.T) {
var expected Batch
require.NoError(t, expected.SetRepr(b.data))
expected.db = db
// Batch options should remain same after reset.
expected.opts = b.opts
require.Equal(t, &expected, b)

// Reset batch can be used to write and commit a new record.
Expand Down Expand Up @@ -1720,3 +1723,47 @@ func TestBatchSpanCaching(t *testing.T) {
}
}
}

func TestBatchOption(t *testing.T) {
for _, tc := range []struct {
name string
opts []BatchOption
expected *Batch
}{
{
name: "default",
opts: nil,
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: defaultBatchInitialSize,
maxRetainedSizeBytes: defaultBatchMaxRetainedSize,
},
}},
},
{
name: "with_custom_initial_size",
opts: []BatchOption{WithInitialSizeBytes(2 << 10)},
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: 2 << 10,
maxRetainedSizeBytes: defaultBatchMaxRetainedSize,
},
}},
},
{
name: "with_custom_max_retained_size",
opts: []BatchOption{WithMaxRetainedSizeBytes(2 << 10)},
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: defaultBatchInitialSize,
maxRetainedSizeBytes: 2 << 10,
},
}},
},
} {
b := newBatch(nil, tc.opts...)
// newBatch returns batch from the pool so it is possible for len(data) to be > 0
b.data = nil
require.Equal(t, tc.expected, b)
}
}
8 changes: 4 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,14 +1494,14 @@ func (i *Iterator) constructPointIter(

// NewBatch returns a new empty write-only batch. Any reads on the batch will
// return an error. If the batch is committed it will be applied to the DB.
func (d *DB) NewBatch() *Batch {
return newBatch(d)
func (d *DB) NewBatch(opts ...BatchOption) *Batch {
return newBatch(d, opts...)
}

// NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
// the specified memory space for the internal slice in advance.
func (d *DB) NewBatchWithSize(size int) *Batch {
return newBatchWithSize(d, size)
func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
return newBatchWithSize(d, size, opts...)
}

// NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
Expand Down

0 comments on commit d253ff7

Please sign in to comment.