Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

replace original file and rebuilt index after merge #255

Merged
merged 9 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/merge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ func main() {

// then merge the data files
// all the invalid data will be removed, and the valid data will be merged into the new data files.
_ = db.Merge()
_ = db.Merge(true)
}
51 changes: 50 additions & 1 deletion merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"sync/atomic"

"github.com/rosedblabs/rosedb/v2/index"
"github.com/rosedblabs/wal"
)

Expand All @@ -23,7 +24,55 @@ const (
//
// Merge operation maybe a very time-consuming operation when the database is large.
// So it is recommended to perform this operation when the database is idle.
func (db *DB) Merge() error {
// If reopenAfterDone is true, the original file will be replaced by the merge file,
// and db's index will be rebuilt after the merge is complete.
func (db *DB) Merge(reopenAfterDone bool) error {
if err := db.merge(); err != nil {
roseduan marked this conversation as resolved.
Show resolved Hide resolved
return err
}

if !reopenAfterDone {
return nil
}

db.mu.Lock()
defer db.mu.Unlock()
roseduan marked this conversation as resolved.
Show resolved Hide resolved

// close current files
db.dataFiles.Close()

// replace original file
if err := loadMergeFiles(db.options.DirPath); err != nil {
return err
}

// open data files from WAL
walFiles, err := wal.Open(wal.Options{
DirPath: db.options.DirPath,
SegmentSize: db.options.SegmentSize,
SegmentFileExt: dataFileNameSuffix,
BlockCache: db.options.BlockCache,
Sync: db.options.Sync,
BytesPerSync: db.options.BytesPerSync,
})
if err != nil {
return err
}
db.dataFiles = walFiles

// rebuild index
db.index = index.NewIndexer()
if err := db.loadIndexFromHintFile(); err != nil {
return err
}
if err := db.loadIndexFromWAL(); err != nil {
return err
}

return nil
}

func (db *DB) merge() error {
db.mu.Lock()
// check if the database is closed
if db.closed {
Expand Down
88 changes: 80 additions & 8 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rosedb

import (
"math/rand"
"os"
"sync"
"testing"

Expand All @@ -15,7 +16,7 @@ func TestDB_Merge_1_Empty(t *testing.T) {
assert.Nil(t, err)
defer destroyDB(db)

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)
}

Expand All @@ -34,7 +35,7 @@ func TestDB_Merge_2_All_Invalid(t *testing.T) {
assert.Nil(t, err)
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)

_ = db.Close()
Expand All @@ -59,7 +60,7 @@ func TestDB_Merge_3_All_Valid(t *testing.T) {
assert.Nil(t, err)
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)

_ = db.Close()
Expand Down Expand Up @@ -87,9 +88,9 @@ func TestDB_Merge_4_Twice(t *testing.T) {
assert.Nil(t, err)
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)
err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)

_ = db.Close()
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestDB_Merge_5_Mixed(t *testing.T) {
assert.Nil(t, err)
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)

_ = db.Close()
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestDB_Merge_6_Appending(t *testing.T) {
}()
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -215,7 +216,7 @@ func TestDB_Multi_Open_Merge(t *testing.T) {
assert.Nil(t, err)
}

err = db.Merge()
err = db.Merge(false)
assert.Nil(t, err)
err = db.Close()
assert.Nil(t, err)
Expand All @@ -231,3 +232,74 @@ func TestDB_Multi_Open_Merge(t *testing.T) {
}
assert.Equal(t, len(kvs), db.index.Size())
}

func TestDB_Merge_ReopenAfterDone(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

kvs := make(map[string][]byte)
for i := 0; i < 10000; i++ {
Jeremy-Run marked this conversation as resolved.
Show resolved Hide resolved
Jeremy-Run marked this conversation as resolved.
Show resolved Hide resolved
key := utils.GetTestKey(i)
value := utils.RandomValue(128)
kvs[string(key)] = value
err := db.Put(key, value)
assert.Nil(t, err)
}

err = db.Merge(true)
assert.Nil(t, err)
_, err = os.Stat(mergeDirPath(options.DirPath))
assert.Equal(t, true, os.IsNotExist(err))

for key, value := range kvs {
v, err := db.Get([]byte(key))
assert.Nil(t, err)
assert.Equal(t, value, v)
}
assert.Equal(t, len(kvs), db.index.Size())
}

func TestDB_Merge_Concurrent_Put(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

wg := sync.WaitGroup{}
m := sync.Map{}
wg.Add(11)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for i := 0; i < 10000; i++ {
key := utils.GetTestKey(rand.Int())
value := utils.RandomValue(128)
m.Store(string(key), value)
e := db.Put(key, value)
assert.Nil(t, e)
}
}()
}
go func() {
defer wg.Done()
err = db.Merge(true)
assert.Nil(t, err)
}()
wg.Wait()

_, err = os.Stat(mergeDirPath(options.DirPath))
assert.Equal(t, true, os.IsNotExist(err))

var count int
m.Range(func(key, value any) bool {
v, err := db.Get([]byte(key.(string)))
assert.Nil(t, err)
assert.Equal(t, value, v)
count++
return true
})
assert.Equal(t, count, db.index.Size())

}