diff --git a/db.go b/db.go index 2665caeb..73f7dfca 100644 --- a/db.go +++ b/db.go @@ -93,28 +93,24 @@ func Open(options Options) (*DB, error) { return nil, err } - // open data files from WAL - walFiles, err := wal.Open(wal.Options{ - DirPath: options.DirPath, - SegmentSize: options.SegmentSize, - SegmentFileExt: dataFileNameSuffix, - BlockCache: options.BlockCache, - Sync: options.Sync, - BytesPerSync: options.BytesPerSync, - }) - if err != nil { - return nil, err - } - // init DB instance db := &DB{ - dataFiles: walFiles, index: index.NewIndexer(), options: options, fileLock: fileLock, batchPool: sync.Pool{New: makeBatch}, } + // open data files + if db.dataFiles, err = db.openWalFiles(); err != nil { + return nil, err + } + + // load index + if err = db.loadIndex(); err != nil { + return nil, err + } + // enable watch if options.WatchQueueSize > 0 { db.watchCh = make(chan *Event, 100) @@ -123,17 +119,35 @@ func Open(options Options) (*DB, error) { go db.watcher.sendEvent(db.watchCh) } - // load index frm hint file - if err = db.loadIndexFromHintFile(); err != nil { + return db, nil +} + +func (db *DB) openWalFiles() (*wal.WAL, error) { + // open data files from WAL + walFiles, err := wal.Open(wal.Options{ + DirPath: db.options.DirPath, + SegmentSize: db.options.SegmentSize, + SegmentFileExt: dataFileNameSuffix, + BlockCache: db.options.BlockCache, + Sync: db.options.Sync, + BytesPerSync: db.options.BytesPerSync, + }) + if err != nil { return nil, err } + return walFiles, nil +} +func (db *DB) loadIndex() error { + // load index frm hint file + if err := db.loadIndexFromHintFile(); err != nil { + return err + } // load index from data files - if err = db.loadIndexFromWAL(); err != nil { - return nil, err + if err := db.loadIndexFromWAL(); err != nil { + return err } - - return db, nil + return nil } // Close the database, close all data files and release file lock. @@ -143,16 +157,10 @@ func (db *DB) Close() error { db.mu.Lock() defer db.mu.Unlock() - // close wal - if err := db.dataFiles.Close(); err != nil { + if err := db.closeFiles(); err != nil { return err } - // close hint file if exists - if db.hintFile != nil { - if err := db.hintFile.Close(); err != nil { - return err - } - } + // release file lock if err := db.fileLock.Unlock(); err != nil { return err @@ -167,6 +175,21 @@ func (db *DB) Close() error { return nil } +// closeFiles close all data files and hint file +func (db *DB) closeFiles() error { + // close wal + if err := db.dataFiles.Close(); err != nil { + return err + } + // close hint file if exists + if db.hintFile != nil { + if err := db.hintFile.Close(); err != nil { + return err + } + } + return nil +} + // Sync all data files to the underlying storage. func (db *DB) Sync() error { db.mu.Lock() diff --git a/examples/merge/main.go b/examples/merge/main.go index fa911330..102ff9c9 100644 --- a/examples/merge/main.go +++ b/examples/merge/main.go @@ -34,5 +34,5 @@ func main() { // then merge the data files // all the invalid data will be removed, and the valid data will be merged into the new data files. - _ = db.Merge() + _ = db.Merge(true) } diff --git a/merge.go b/merge.go index 4def9c89..2c12186b 100644 --- a/merge.go +++ b/merge.go @@ -23,7 +23,43 @@ const ( // // Merge operation maybe a very time-consuming operation when the database is large. // So it is recommended to perform this operation when the database is idle. -func (db *DB) Merge() error { +// If reopenAfterDone is true, the original file will be replaced by the merge file, +// and db's index will be rebuilt after the merge is complete. +func (db *DB) Merge(reopenAfterDone bool) error { + if err := db.doMerge(); err != nil { + return err + } + + if !reopenAfterDone { + return nil + } + + db.mu.Lock() + defer db.mu.Unlock() + + // close current files + db.closeFiles() + + // replace original file + err := loadMergeFiles(db.options.DirPath) + if err != nil { + return err + } + + // open data files + if db.dataFiles, err = db.openWalFiles(); err != nil { + return err + } + + // rebuild index + if err = db.loadIndex(); err != nil { + return err + } + + return nil +} + +func (db *DB) doMerge() error { db.mu.Lock() // check if the database is closed if db.closed { diff --git a/merge_test.go b/merge_test.go index e82d5f57..77714ef1 100644 --- a/merge_test.go +++ b/merge_test.go @@ -2,6 +2,7 @@ package rosedb import ( "math/rand" + "os" "sync" "testing" @@ -15,7 +16,7 @@ func TestDB_Merge_1_Empty(t *testing.T) { assert.Nil(t, err) defer destroyDB(db) - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) } @@ -34,7 +35,7 @@ func TestDB_Merge_2_All_Invalid(t *testing.T) { assert.Nil(t, err) } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) _ = db.Close() @@ -59,7 +60,7 @@ func TestDB_Merge_3_All_Valid(t *testing.T) { assert.Nil(t, err) } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) _ = db.Close() @@ -87,9 +88,9 @@ func TestDB_Merge_4_Twice(t *testing.T) { assert.Nil(t, err) } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) _ = db.Close() @@ -129,7 +130,7 @@ func TestDB_Merge_5_Mixed(t *testing.T) { assert.Nil(t, err) } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) _ = db.Close() @@ -180,7 +181,7 @@ func TestDB_Merge_6_Appending(t *testing.T) { }() } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) wg.Wait() @@ -215,7 +216,7 @@ func TestDB_Multi_Open_Merge(t *testing.T) { assert.Nil(t, err) } - err = db.Merge() + err = db.Merge(false) assert.Nil(t, err) err = db.Close() assert.Nil(t, err) @@ -231,3 +232,74 @@ func TestDB_Multi_Open_Merge(t *testing.T) { } assert.Equal(t, len(kvs), db.index.Size()) } + +func TestDB_Merge_ReopenAfterDone(t *testing.T) { + options := DefaultOptions + db, err := Open(options) + assert.Nil(t, err) + defer destroyDB(db) + + kvs := make(map[string][]byte) + for i := 0; i < 200000; i++ { + key := utils.GetTestKey(i) + value := utils.RandomValue(128) + kvs[string(key)] = value + err := db.Put(key, value) + assert.Nil(t, err) + } + + err = db.Merge(true) + assert.Nil(t, err) + _, err = os.Stat(mergeDirPath(options.DirPath)) + assert.Equal(t, true, os.IsNotExist(err)) + + for key, value := range kvs { + v, err := db.Get([]byte(key)) + assert.Nil(t, err) + assert.Equal(t, value, v) + } + assert.Equal(t, len(kvs), db.index.Size()) +} + +func TestDB_Merge_Concurrent_Put(t *testing.T) { + options := DefaultOptions + db, err := Open(options) + assert.Nil(t, err) + defer destroyDB(db) + + wg := sync.WaitGroup{} + m := sync.Map{} + wg.Add(11) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + for i := 0; i < 10000; i++ { + key := utils.GetTestKey(rand.Int()) + value := utils.RandomValue(128) + m.Store(string(key), value) + e := db.Put(key, value) + assert.Nil(t, e) + } + }() + } + go func() { + defer wg.Done() + err = db.Merge(true) + assert.Nil(t, err) + }() + wg.Wait() + + _, err = os.Stat(mergeDirPath(options.DirPath)) + assert.Equal(t, true, os.IsNotExist(err)) + + var count int + m.Range(func(key, value any) bool { + v, err := db.Get([]byte(key.(string))) + assert.Nil(t, err) + assert.Equal(t, value, v) + count++ + return true + }) + assert.Equal(t, count, db.index.Size()) + +}