Skip to content

Commit

Permalink
fix(stream): setup oracle correctly in stream writer (#1968) (#1904)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 authored Jun 12, 2023
1 parent 1425208 commit 907dd65
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,8 +1573,8 @@ func (s *levelsController) close() error {
}

// get searches for a given key in all the levels of the LSM tree. It returns
// key version <= the expected version (maxVs). If not found, it returns an empty
// y.ValueStruct.
// key version <= the expected version (version in key). If not found,
// it returns an empty y.ValueStruct.
func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
y.ValueStruct, error) {
if s.kv.IsClosed() {
Expand Down
5 changes: 5 additions & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func (sw *StreamWriter) Flush() error {
if sw.db.orc != nil {
sw.db.orc.Stop()
}

if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
sw.maxVersion = curMax
}

sw.db.orc = newOracle(sw.db.opt)
sw.db.orc.nextTxnTs = sw.maxVersion
sw.db.orc.txnMark.Done(sw.maxVersion)
Expand Down
47 changes: 47 additions & 0 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,4 +750,51 @@ func TestStreamWriterIncremental(t *testing.T) {
require.NoError(t, err)
})
})

t.Run("multiple incremental with older data first", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
KVToBuffer(&pb.KV{
Key: []byte("a1"),
Value: []byte("val1"),
Version: 11,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

buf = z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
KVToBuffer(&pb.KV{
Key: []byte("a2"),
Value: []byte("val2"),
Version: 9,
}, buf)
sw = db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

// This will move the maxTs to 10 (earlier, without the fix)
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("a1"), []byte("val3"))
}))
// This will move the maxTs to 11 (earliler, without the fix)
require.NoError(t, db.Update(func(txn *Txn) error {
return txn.Set([]byte("a3"), []byte("val4"))
}))

// And now, the first write with val1 will resurface (without the fix)
require.NoError(t, db.View(func(txn *Txn) error {
item, err := txn.Get([]byte("a1"))
require.NoError(t, err)
val, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, "val3", string(val))
return nil
}))
})
})
}

0 comments on commit 907dd65

Please # to comment.