Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(stream): add support for incremental stream writer (#1722) #1874

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error {
WithValueDir(so.outDir).
WithNumVersionsToKeep(so.numVersions).
WithCompression(options.CompressionType(so.compressionType)).
WithEncryptionKey(encKey).
WithReadOnly(false)
err = inDB.StreamDB(outOpt)

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func (db *DB) prepareToDrop() (func(), error) {
// write it to db. Then, flush all the pending memtable. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
return func() {}, err
}
reqs := make([]*request, 0, 10)
for {
Expand Down
64 changes: 58 additions & 6 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type StreamWriter struct {
throttle *y.Throttle
maxVersion uint64
writers map[uint32]*sortedWriter
prevLevel int
}

// NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
Expand All @@ -67,18 +68,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance.
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
func (sw *StreamWriter) Prepare() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

done, err := sw.db.dropAll()
// Ensure that done() is never called more than once.
var once sync.Once
sw.done = func() { once.Do(done) }
return err
}

// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
// In incremental stream write, the tables are written at one level above the current base level.
func (sw *StreamWriter) PrepareIncremental() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// Ensure that done() is never called more than once.
var once sync.Once

// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
if err != nil {
sw.done = func() { once.Do(f) }
return err
}
sw.db.stopCompactions()
done := func() {
sw.db.startCompactions()
f()
}
sw.done = func() { once.Do(done) }

return err
isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
isEmptyDB = false
break
}
}
if isEmptyDB {
// If DB is empty, we should allow doing incremental stream write.
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
Expand Down Expand Up @@ -110,16 +151,25 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
}

sw.writeLock.Lock()
if sw.maxVersion < kv.Version {
sw.maxVersion = kv.Version
}
if sw.prevLevel == 0 {
// If prevLevel is 0, that means that we have not written anything yet.
// So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
// so we can set prevLevel to len(levels).
sw.prevLevel = len(sw.db.lc.levels)
}
sw.writeLock.Unlock()

var meta, userMeta byte
if len(kv.Meta) > 0 {
meta = kv.Meta[0]
}
if len(kv.UserMeta) > 0 {
userMeta = kv.UserMeta[0]
}
if sw.maxVersion < kv.Version {
sw.maxVersion = kv.Version
}
e := &Entry{
Key: y.KeyWithTs(kv.Key, kv.Version),
Value: y.Copy(kv.Value),
Expand Down Expand Up @@ -285,6 +335,7 @@ type sortedWriter struct {

builder *table.Builder
lastKey []byte
level int
streamID uint32
reqCh chan *request
// Have separate closer for each writer, as it can be closed at any time.
Expand All @@ -304,6 +355,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
builder: table.NewTableBuilder(bopts),
reqCh: make(chan *request, 3),
closer: z.NewCloser(1),
level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
}

go w.handleRequests()
Expand Down Expand Up @@ -435,7 +487,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
}
lc := w.db.lc

lhandler := lc.levels[len(lc.levels)-1]
lhandler := lc.levels[w.level]
// Now that table can be opened successfully, let's add this to the MANIFEST.
change := &pb.ManifestChange{
Id: tbl.ID(),
Expand Down
99 changes: 99 additions & 0 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,3 +577,102 @@ func TestStreamWriterEncrypted(t *testing.T) {
require.NoError(t, db.Close())

}

// Test that stream writer does not crashes with large values in managed mode.
func TestStreamWriterWithLargeValue(t *testing.T) {
opts := DefaultOptions("")
opts.managedTxns = true
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
val := make([]byte, 10<<20)
_, err := rand.Read(val)
require.NoError(t, err)
KVToBuffer(&pb.KV{
Key: []byte("key"),
Value: val,
Version: 1,
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
for _, key := range keys {
KVToBuffer(&pb.KV{
Key: key,
Value: []byte("val"),
Version: 1,
}, buf)
}
// Now do an incremental stream write.
sw := db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
}

t.Run("incremental on non-empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
})
})

t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
})
})

t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
})
})
}