diff --git a/batch.go b/batch.go index 0adc9030f1..f9855d0554 100644 --- a/batch.go +++ b/batch.go @@ -30,10 +30,11 @@ import ( ) const ( - batchInitialSize = 1 << 10 // 1 KB - batchMaxRetainedSize = 1 << 20 // 1 MB - invalidBatchCount = 1<<32 - 1 - maxVarintLen32 = 5 + invalidBatchCount = 1<<32 - 1 + maxVarintLen32 = 5 + + defaultBatchInitialSize = 1 << 10 // 1 KB + defaultBatchMaxRetainedSize = 1 << 20 // 1 MB ) // ErrNotIndexed means that a read operation on a batch failed because the @@ -271,6 +272,7 @@ type batchInternal struct { cmp Compare formatKey base.FormatKey abbreviatedKey AbbreviatedKey + opts batchOptions // An upper bound on required space to add this batch to a memtable. // Note that although batches are limited to 4 GiB in size, that limit @@ -440,14 +442,18 @@ var indexedBatchPool = sync.Pool{ }, } -func newBatch(db *DB) *Batch { +func newBatch(db *DB, opts ...BatchOption) *Batch { b := batchPool.Get().(*Batch) b.db = db + b.opts.ensureDefaults() + for _, opt := range opts { + opt(&b.opts) + } return b } -func newBatchWithSize(db *DB, size int) *Batch { - b := newBatch(db) +func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch { + b := newBatch(db, opts...) if cap(b.data) < size { b.data = rawalloc.New(0, size) } @@ -462,6 +468,7 @@ func newIndexedBatch(db *DB, comparer *Comparer) *Batch { i.batch.db = db i.batch.index = &i.index i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey) + i.batch.opts.ensureDefaults() return &i.batch } @@ -1510,7 +1517,8 @@ func (b *Batch) Indexed() bool { // init ensures that the batch data slice is initialized to meet the // minimum required size and allocates space for the batch header. func (b *Batch) init(size int) { - n := batchInitialSize + b.opts.ensureDefaults() + n := b.opts.initialSizeBytes for n < size { n *= 2 } @@ -1547,12 +1555,13 @@ func (b *Batch) reset() { cmp: b.cmp, formatKey: b.formatKey, abbreviatedKey: b.abbreviatedKey, + opts: b.opts, index: b.index, db: b.db, } b.applied.Store(false) if b.data != nil { - if cap(b.data) > batchMaxRetainedSize { + if cap(b.data) > b.opts.maxRetainedSizeBytes { // If the capacity of the buffer is larger than our maximum // retention size, don't re-use it. Let it be GC-ed instead. // This prevents the memory from an unusually large batch from @@ -2402,6 +2411,42 @@ func (i flushFlushableBatchIter) valueSize() uint64 { return length } +// batchOptions holds the parameters to configure batch. +type batchOptions struct { + initialSizeBytes int + maxRetainedSizeBytes int +} + +// ensureDefaults creates batch options with default values. +func (o *batchOptions) ensureDefaults() { + if o.initialSizeBytes <= 0 { + o.initialSizeBytes = defaultBatchInitialSize + } + if o.maxRetainedSizeBytes <= 0 { + o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize + } +} + +// BatchOption allows customizing the batch. +type BatchOption func(*batchOptions) + +// WithInitialSizeBytes sets a custom initial size for the batch. Defaults +// to 1KB. +func WithInitialSizeBytes(s int) BatchOption { + return func(opts *batchOptions) { + opts.initialSizeBytes = s + } +} + +// WithMaxRetainedSizeBytes sets a custom max size for the batch to be +// re-used. Any batch which exceeds the max retained size would be GC-ed. +// Defaults to 1MB. +func WithMaxRetainedSizeBytes(s int) BatchOption { + return func(opts *batchOptions) { + opts.maxRetainedSizeBytes = s + } +} + // batchSort returns iterators for the sorted contents of the batch. It is // intended for testing use only. The batch.Sort dance is done to prevent // exposing this method in the public pebble interface. diff --git a/batch_test.go b/batch_test.go index 8687bc25b4..82985a673f 100644 --- a/batch_test.go +++ b/batch_test.go @@ -33,7 +33,7 @@ import ( func TestBatch(t *testing.T) { testBatch(t, 0) - testBatch(t, batchInitialSize) + testBatch(t, defaultBatchInitialSize) } func testBatch(t *testing.T, size int) { @@ -206,9 +206,9 @@ func TestBatchPreAlloc(t *testing.T) { size int exp int }{ - {0, batchInitialSize}, - {batchInitialSize, batchInitialSize}, - {2 * batchInitialSize, 2 * batchInitialSize}, + {0, defaultBatchInitialSize}, + {defaultBatchInitialSize, defaultBatchInitialSize}, + {2 * defaultBatchInitialSize, 2 * defaultBatchInitialSize}, } for _, c := range cases { b := newBatchWithSize(nil, c.size) @@ -257,11 +257,12 @@ func TestBatchLen(t *testing.T) { func TestBatchEmpty(t *testing.T) { testBatchEmpty(t, 0) - testBatchEmpty(t, batchInitialSize) + testBatchEmpty(t, defaultBatchInitialSize) + testBatchEmpty(t, 0, WithInitialSizeBytes(2<<10), WithMaxRetainedSizeBytes(2<<20)) } -func testBatchEmpty(t *testing.T, size int) { - b := newBatchWithSize(nil, size) +func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) { + b := newBatchWithSize(nil, size, opts...) require.True(t, b.Empty()) ops := []func(*Batch) error{ @@ -404,6 +405,8 @@ func TestBatchReset(t *testing.T) { var expected Batch require.NoError(t, expected.SetRepr(b.data)) expected.db = db + // Batch options should remain same after reset. + expected.opts = b.opts require.Equal(t, &expected, b) // Reset batch can be used to write and commit a new record. @@ -1720,3 +1723,47 @@ func TestBatchSpanCaching(t *testing.T) { } } } + +func TestBatchOption(t *testing.T) { + for _, tc := range []struct { + name string + opts []BatchOption + expected *Batch + }{ + { + name: "default", + opts: nil, + expected: &Batch{batchInternal: batchInternal{ + opts: batchOptions{ + initialSizeBytes: defaultBatchInitialSize, + maxRetainedSizeBytes: defaultBatchMaxRetainedSize, + }, + }}, + }, + { + name: "with_custom_initial_size", + opts: []BatchOption{WithInitialSizeBytes(2 << 10)}, + expected: &Batch{batchInternal: batchInternal{ + opts: batchOptions{ + initialSizeBytes: 2 << 10, + maxRetainedSizeBytes: defaultBatchMaxRetainedSize, + }, + }}, + }, + { + name: "with_custom_max_retained_size", + opts: []BatchOption{WithMaxRetainedSizeBytes(2 << 10)}, + expected: &Batch{batchInternal: batchInternal{ + opts: batchOptions{ + initialSizeBytes: defaultBatchInitialSize, + maxRetainedSizeBytes: 2 << 10, + }, + }}, + }, + } { + b := newBatch(nil, tc.opts...) + // newBatch returns batch from the pool so it is possible for len(data) to be > 0 + b.data = nil + require.Equal(t, tc.expected, b) + } +} diff --git a/db.go b/db.go index bbd68a7489..8b855930fc 100644 --- a/db.go +++ b/db.go @@ -1494,14 +1494,14 @@ func (i *Iterator) constructPointIter( // NewBatch returns a new empty write-only batch. Any reads on the batch will // return an error. If the batch is committed it will be applied to the DB. -func (d *DB) NewBatch() *Batch { - return newBatch(d) +func (d *DB) NewBatch(opts ...BatchOption) *Batch { + return newBatch(d, opts...) } // NewBatchWithSize is mostly identical to NewBatch, but it will allocate the // the specified memory space for the internal slice in advance. -func (d *DB) NewBatchWithSize(size int) *Batch { - return newBatchWithSize(d, size) +func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch { + return newBatchWithSize(d, size, opts...) } // NewIndexedBatch returns a new empty read-write batch. Any reads on the batch