Skip to content

Configurable initial and max retained batch sizes #3139

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
batchInitialSize = 1 << 10 // 1 KB
batchMaxRetainedSize = 1 << 20 // 1 MB
invalidBatchCount = 1<<32 - 1
maxVarintLen32 = 5
invalidBatchCount = 1<<32 - 1
maxVarintLen32 = 5

defaultBatchInitialSize = 1 << 10 // 1 KB
defaultBatchMaxRetainedSize = 1 << 20 // 1 MB
)

// ErrNotIndexed means that a read operation on a batch failed because the
Expand Down Expand Up @@ -271,6 +272,7 @@ type batchInternal struct {
cmp Compare
formatKey base.FormatKey
abbreviatedKey AbbreviatedKey
opts batchOptions

// An upper bound on required space to add this batch to a memtable.
// Note that although batches are limited to 4 GiB in size, that limit
Expand Down Expand Up @@ -440,14 +442,18 @@ var indexedBatchPool = sync.Pool{
},
}

func newBatch(db *DB) *Batch {
func newBatch(db *DB, opts ...BatchOption) *Batch {
b := batchPool.Get().(*Batch)
b.db = db
b.opts.ensureDefaults()
for _, opt := range opts {
opt(&b.opts)
}
return b
}

func newBatchWithSize(db *DB, size int) *Batch {
b := newBatch(db)
func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {
b := newBatch(db, opts...)
if cap(b.data) < size {
b.data = rawalloc.New(0, size)
}
Expand All @@ -462,6 +468,7 @@ func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
i.batch.db = db
i.batch.index = &i.index
i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey)
i.batch.opts.ensureDefaults()
return &i.batch
}

Expand Down Expand Up @@ -1510,7 +1517,8 @@ func (b *Batch) Indexed() bool {
// init ensures that the batch data slice is initialized to meet the
// minimum required size and allocates space for the batch header.
func (b *Batch) init(size int) {
n := batchInitialSize
b.opts.ensureDefaults()
n := b.opts.initialSizeBytes
for n < size {
n *= 2
}
Expand Down Expand Up @@ -1547,12 +1555,13 @@ func (b *Batch) reset() {
cmp: b.cmp,
formatKey: b.formatKey,
abbreviatedKey: b.abbreviatedKey,
opts: b.opts,
index: b.index,
db: b.db,
}
b.applied.Store(false)
if b.data != nil {
if cap(b.data) > batchMaxRetainedSize {
if cap(b.data) > b.opts.maxRetainedSizeBytes {
// If the capacity of the buffer is larger than our maximum
// retention size, don't re-use it. Let it be GC-ed instead.
// This prevents the memory from an unusually large batch from
Expand Down Expand Up @@ -2402,6 +2411,42 @@ func (i flushFlushableBatchIter) valueSize() uint64 {
return length
}

// batchOptions holds the parameters to configure batch.
type batchOptions struct {
initialSizeBytes int
maxRetainedSizeBytes int
}

// ensureDefaults creates batch options with default values.
func (o *batchOptions) ensureDefaults() {
if o.initialSizeBytes <= 0 {
o.initialSizeBytes = defaultBatchInitialSize
}
if o.maxRetainedSizeBytes <= 0 {
o.maxRetainedSizeBytes = defaultBatchMaxRetainedSize
}
}

// BatchOption allows customizing the batch.
type BatchOption func(*batchOptions)

// WithInitialSizeBytes sets a custom initial size for the batch. Defaults
// to 1KB.
func WithInitialSizeBytes(s int) BatchOption {
return func(opts *batchOptions) {
opts.initialSizeBytes = s
}
}

// WithMaxRetainedSizeBytes sets a custom max size for the batch to be
// re-used. Any batch which exceeds the max retained size would be GC-ed.
// Defaults to 1MB.
func WithMaxRetainedSizeBytes(s int) BatchOption {
return func(opts *batchOptions) {
opts.maxRetainedSizeBytes = s
}
}

// batchSort returns iterators for the sorted contents of the batch. It is
// intended for testing use only. The batch.Sort dance is done to prevent
// exposing this method in the public pebble interface.
Expand Down
61 changes: 54 additions & 7 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestBatch(t *testing.T) {
testBatch(t, 0)
testBatch(t, batchInitialSize)
testBatch(t, defaultBatchInitialSize)
}

func testBatch(t *testing.T, size int) {
Expand Down Expand Up @@ -206,9 +206,9 @@ func TestBatchPreAlloc(t *testing.T) {
size int
exp int
}{
{0, batchInitialSize},
{batchInitialSize, batchInitialSize},
{2 * batchInitialSize, 2 * batchInitialSize},
{0, defaultBatchInitialSize},
{defaultBatchInitialSize, defaultBatchInitialSize},
{2 * defaultBatchInitialSize, 2 * defaultBatchInitialSize},
}
for _, c := range cases {
b := newBatchWithSize(nil, c.size)
Expand Down Expand Up @@ -257,11 +257,12 @@ func TestBatchLen(t *testing.T) {

func TestBatchEmpty(t *testing.T) {
testBatchEmpty(t, 0)
testBatchEmpty(t, batchInitialSize)
testBatchEmpty(t, defaultBatchInitialSize)
testBatchEmpty(t, 0, WithInitialSizeBytes(2<<10), WithMaxRetainedSizeBytes(2<<20))
}

func testBatchEmpty(t *testing.T, size int) {
b := newBatchWithSize(nil, size)
func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) {
b := newBatchWithSize(nil, size, opts...)
require.True(t, b.Empty())

ops := []func(*Batch) error{
Expand Down Expand Up @@ -404,6 +405,8 @@ func TestBatchReset(t *testing.T) {
var expected Batch
require.NoError(t, expected.SetRepr(b.data))
expected.db = db
// Batch options should remain same after reset.
expected.opts = b.opts
require.Equal(t, &expected, b)

// Reset batch can be used to write and commit a new record.
Expand Down Expand Up @@ -1720,3 +1723,47 @@ func TestBatchSpanCaching(t *testing.T) {
}
}
}

func TestBatchOption(t *testing.T) {
for _, tc := range []struct {
name string
opts []BatchOption
expected *Batch
}{
{
name: "default",
opts: nil,
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: defaultBatchInitialSize,
maxRetainedSizeBytes: defaultBatchMaxRetainedSize,
},
}},
},
{
name: "with_custom_initial_size",
opts: []BatchOption{WithInitialSizeBytes(2 << 10)},
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: 2 << 10,
maxRetainedSizeBytes: defaultBatchMaxRetainedSize,
},
}},
},
{
name: "with_custom_max_retained_size",
opts: []BatchOption{WithMaxRetainedSizeBytes(2 << 10)},
expected: &Batch{batchInternal: batchInternal{
opts: batchOptions{
initialSizeBytes: defaultBatchInitialSize,
maxRetainedSizeBytes: 2 << 10,
},
}},
},
} {
b := newBatch(nil, tc.opts...)
// newBatch returns batch from the pool so it is possible for len(data) to be > 0
b.data = nil
require.Equal(t, tc.expected, b)
}
}
8 changes: 4 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,14 +1494,14 @@ func (i *Iterator) constructPointIter(

// NewBatch returns a new empty write-only batch. Any reads on the batch will
// return an error. If the batch is committed it will be applied to the DB.
func (d *DB) NewBatch() *Batch {
return newBatch(d)
func (d *DB) NewBatch(opts ...BatchOption) *Batch {
return newBatch(d, opts...)
}

// NewBatchWithSize is mostly identical to NewBatch, but it will allocate the
// the specified memory space for the internal slice in advance.
func (d *DB) NewBatchWithSize(size int) *Batch {
return newBatchWithSize(d, size)
func (d *DB) NewBatchWithSize(size int, opts ...BatchOption) *Batch {
return newBatchWithSize(d, size, opts...)
}

// NewIndexedBatch returns a new empty read-write batch. Any reads on the batch
Expand Down