diff --git a/go.mod b/go.mod index 7dbfbcb..4e454d7 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ require ( ) require ( + github.com/DataDog/zstd v1.5.5 github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.4 github.com/hashicorp/golang-lru/v2 v2.0.2 github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 7b72037..23fc2ac 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ +github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= +github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/options.go b/options.go index 9c1f27d..d0865ed 100644 --- a/options.go +++ b/options.go @@ -34,6 +34,9 @@ type Options struct { // BytesPerSync specifies the number of bytes to write before calling fsync. BytesPerSync uint32 + + // Compressor specifies the compressor type. + Compressor CompressorType } const ( @@ -43,6 +46,14 @@ const ( GB = 1024 * MB ) +type CompressorType uint8 + +const ( + None CompressorType = iota + Snappy + Zstd +) + var DefaultOptions = Options{ DirPath: os.TempDir(), SegmentSize: GB, @@ -50,4 +61,5 @@ var DefaultOptions = Options{ BlockCache: 32 * KB * 10, Sync: false, BytesPerSync: 0, + Compressor: Snappy, } diff --git a/wal.go b/wal.go index 5e27e49..dbb8e17 100644 --- a/wal.go +++ b/wal.go @@ -10,6 +10,8 @@ import ( "strings" "sync" + "github.com/DataDog/zstd" + "github.com/golang/snappy" lru "github.com/hashicorp/golang-lru/v2" ) @@ -58,6 +60,7 @@ type WAL struct { type Reader struct { segmentReaders []*segmentReader currentReader int + Compressor CompressorType } // Open opens a WAL with the given options. @@ -219,6 +222,7 @@ func (wal *WAL) NewReaderWithMax(segId SegmentID) *Reader { return &Reader{ segmentReaders: segmentReaders, currentReader: 0, + Compressor: wal.options.Compressor, } } @@ -276,6 +280,20 @@ func (r *Reader) Next() ([]byte, *ChunkPosition, error) { r.currentReader++ return r.Next() } + switch r.Compressor { + case Snappy: + data, err = snappy.Decode(nil, data) + if err != nil { + return nil, nil, err + } + case Zstd: + deCompressedData, err := zstd.Decompress(nil, data) + if err != nil { + return nil, nil, err + } + data = deCompressedData + } + return data, position, err } @@ -313,8 +331,6 @@ func (wal *WAL) ClearPendingWrites() { } // PendingWrites add data to wal.pendingWrites and wait for batch write. -// If the data in pendingWrites exceeds the size of one segment, -// it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites. func (wal *WAL) PendingWrites(data []byte) { wal.pendingWritesLock.Lock() defer wal.pendingWritesLock.Unlock() @@ -342,6 +358,8 @@ func (wal *WAL) rotateActiveSegment() error { // WriteAll write wal.pendingWrites to WAL and then clear pendingWrites, // it will not sync the segment file based on wal.options, you should call Sync() manually. +// If the data in pendingWrites exceeds the size of one segment, +// it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites. func (wal *WAL) WriteAll() ([]*ChunkPosition, error) { if len(wal.pendingWrites) == 0 { return make([]*ChunkPosition, 0), nil @@ -380,9 +398,22 @@ func (wal *WAL) WriteAll() ([]*ChunkPosition, error) { func (wal *WAL) Write(data []byte) (*ChunkPosition, error) { wal.mu.Lock() defer wal.mu.Unlock() + + switch wal.options.Compressor { + case Snappy: + data = snappy.Encode(nil, data) + case Zstd: + CompressedData, err := zstd.Compress(nil, data) + if err != nil { + return nil, err + } + data = CompressedData + } + if int64(len(data))+chunkHeaderSize > wal.options.SegmentSize { return nil, ErrValueTooLarge } + // if the active segment file is full, sync it and create a new one. if wal.isFull(int64(len(data))) { if err := wal.rotateActiveSegment(); err != nil { @@ -432,7 +463,25 @@ func (wal *WAL) Read(pos *ChunkPosition) ([]byte, error) { } // read the data from the segment file. - return segment.Read(pos.BlockNumber, pos.ChunkOffset) + data, err := segment.Read(pos.BlockNumber, pos.ChunkOffset) + if err != nil { + return nil, err + } + switch wal.options.Compressor { + case Snappy: + data, err = snappy.Decode(nil, data) + if err != nil { + return nil, err + } + case Zstd: + deCompressedData, err := zstd.Decompress(nil, data) + if err != nil { + return nil, err + } + data = deCompressedData + } + + return data, nil } // Close closes the WAL. diff --git a/wal_test.go b/wal_test.go index 682f921..9687550 100644 --- a/wal_test.go +++ b/wal_test.go @@ -17,174 +17,287 @@ func destroyWAL(wal *WAL) { } func TestWAL_WriteALL(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-write-batch-1") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-write-batch-1") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - testWriteAllIterate(t, wal, 0, 10) - assert.True(t, wal.IsEmpty()) + testWriteAllIterate(t, wal, 0, 10) + assert.True(t, wal.IsEmpty()) - testWriteAllIterate(t, wal, 10000, 512) - assert.False(t, wal.IsEmpty()) + testWriteAllIterate(t, wal, 10000, 512) + assert.False(t, wal.IsEmpty()) + } } func TestWAL_Write(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-write1") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-write1") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - // write 1 - pos1, err := wal.Write([]byte("hello1")) - assert.Nil(t, err) - assert.NotNil(t, pos1) - pos2, err := wal.Write([]byte("hello2")) - assert.Nil(t, err) - assert.NotNil(t, pos2) - pos3, err := wal.Write([]byte("hello3")) - assert.Nil(t, err) - assert.NotNil(t, pos3) + // write 1 + pos1, err := wal.Write([]byte("hello1")) + assert.Nil(t, err) + assert.NotNil(t, pos1) + pos2, err := wal.Write([]byte("hello2")) + assert.Nil(t, err) + assert.NotNil(t, pos2) + pos3, err := wal.Write([]byte("hello3")) + assert.Nil(t, err) + assert.NotNil(t, pos3) + + val, err := wal.Read(pos1) + assert.Nil(t, err) + assert.Equal(t, "hello1", string(val)) + val, err = wal.Read(pos2) + assert.Nil(t, err) + assert.Equal(t, "hello2", string(val)) + val, err = wal.Read(pos3) + assert.Nil(t, err) + assert.Equal(t, "hello3", string(val)) + } - val, err := wal.Read(pos1) - assert.Nil(t, err) - assert.Equal(t, "hello1", string(val)) - val, err = wal.Read(pos2) - assert.Nil(t, err) - assert.Equal(t, "hello2", string(val)) - val, err = wal.Read(pos3) - assert.Nil(t, err) - assert.Equal(t, "hello3", string(val)) } func TestWAL_Write_large(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-write2") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-write2") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - testWriteAndIterate(t, wal, 100000, 512) + testWriteAndIterate(t, wal, 100000, 512) + } } func TestWAL_Write_large2(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-write3") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-write3") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - testWriteAndIterate(t, wal, 2000, 32*1024*3+10) + testWriteAndIterate(t, wal, 2000, 32*1024*3+10) + } } func TestWAL_OpenNewActiveSegment(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-new-active-segment") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) - - testWriteAndIterate(t, wal, 2000, 512) - err = wal.OpenNewActiveSegment() - assert.Nil(t, err) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-new-active-segment") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - val := strings.Repeat("wal", 100) - for i := 0; i < 100; i++ { - pos, err := wal.Write([]byte(val)) + testWriteAndIterate(t, wal, 2000, 512) + err = wal.OpenNewActiveSegment() assert.Nil(t, err) - assert.NotNil(t, pos) + + val := strings.Repeat("wal", 100) + for i := 0; i < 100; i++ { + pos, err := wal.Write([]byte(val)) + assert.Nil(t, err) + assert.NotNil(t, pos) + } } } func TestWAL_IsEmpty(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-is-empty") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-is-empty") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - assert.True(t, wal.IsEmpty()) - testWriteAndIterate(t, wal, 2000, 512) - assert.False(t, wal.IsEmpty()) + assert.True(t, wal.IsEmpty()) + testWriteAndIterate(t, wal, 2000, 512) + assert.False(t, wal.IsEmpty()) + } } func TestWAL_Reader(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-wal-reader") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) - - var size = 100000 - val := strings.Repeat("wal", 512) - for i := 0; i < size; i++ { - _, err := wal.Write([]byte(val)) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-wal-reader") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) assert.Nil(t, err) - } + defer destroyWAL(wal) + + var size = 100000 + val := strings.Repeat("wal", 512) + for i := 0; i < size; i++ { + _, err := wal.Write([]byte(val)) + assert.Nil(t, err) + } - validate := func(walInner *WAL, size int) { - var i = 0 - reader := walInner.NewReader() - for { - chunk, position, err := reader.Next() - if err != nil { - if err == io.EOF { - break + validate := func(walInner *WAL, size int) { + var i = 0 + reader := walInner.NewReader() + for { + chunk, position, err := reader.Next() + if err != nil { + if err == io.EOF { + break + } + panic(err) } - panic(err) + assert.NotNil(t, chunk) + assert.NotNil(t, position) + assert.Equal(t, position.SegmentId, reader.CurrentSegmentId()) + i++ } - assert.NotNil(t, chunk) - assert.NotNil(t, position) - assert.Equal(t, position.SegmentId, reader.CurrentSegmentId()) - i++ + assert.Equal(t, i, size) } - assert.Equal(t, i, size) - } - validate(wal, size) - err = wal.Close() - assert.Nil(t, err) + validate(wal, size) + err = wal.Close() + assert.Nil(t, err) - wal2, err := Open(opts) - assert.Nil(t, err) - defer func() { - _ = wal2.Close() - }() - validate(wal2, size) + wal2, err := Open(opts) + assert.Nil(t, err) + defer func() { + _ = wal2.Close() + }() + validate(wal2, size) + } } func testWriteAllIterate(t *testing.T, wal *WAL, size, valueSize int) { @@ -243,90 +356,140 @@ func testWriteAndIterate(t *testing.T, wal *WAL, size int, valueSize int) { } func TestWAL_Delete(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-delete") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 32 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - testWriteAndIterate(t, wal, 2000, 512) - assert.False(t, wal.IsEmpty()) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-delete") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 32 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + testWriteAndIterate(t, wal, 2000, 512) + assert.False(t, wal.IsEmpty()) + defer destroyWAL(wal) - err = wal.Delete() - assert.Nil(t, err) + err = wal.Delete() + assert.Nil(t, err) - wal, err = Open(opts) - assert.Nil(t, err) - assert.True(t, wal.IsEmpty()) + wal, err = Open(opts) + assert.Nil(t, err) + assert.True(t, wal.IsEmpty()) + } } func TestWAL_ReaderWithStart(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-wal-reader-with-start") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".SEG", - SegmentSize: 8 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-wal-reader-with-start") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".SEG", + SegmentSize: 8 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) - _, err = wal.NewReaderWithStart(nil) - assert.NotNil(t, err) + _, err = wal.NewReaderWithStart(nil) + assert.NotNil(t, err) - reader1, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 100}) - assert.Nil(t, err) - _, _, err = reader1.Next() - assert.Equal(t, err, io.EOF) + reader1, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 100}) + assert.Nil(t, err) + _, _, err = reader1.Next() + assert.Equal(t, err, io.EOF) - testWriteAndIterate(t, wal, 20000, 512) - reader2, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 0}) - assert.Nil(t, err) - _, pos2, err := reader2.Next() - assert.Nil(t, err) - assert.Equal(t, pos2.BlockNumber, uint32(0)) - assert.Equal(t, pos2.ChunkOffset, int64(0)) + testWriteAndIterate(t, wal, 500000, 512) + reader2, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 0}) + assert.Nil(t, err) + _, pos2, err := reader2.Next() + assert.Nil(t, err) + assert.Equal(t, pos2.BlockNumber, uint32(0)) + assert.Equal(t, pos2.ChunkOffset, int64(0)) + + reader3, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 1, BlockNumber: 5, ChunkOffset: 0}) + assert.Nil(t, err) + _, pos3, err := reader3.Next() + assert.Nil(t, err) + assert.Equal(t, pos3.SegmentId, uint32(1)) + assert.Equal(t, pos3.BlockNumber, uint32(5)) + } - reader3, err := wal.NewReaderWithStart(&ChunkPosition{SegmentId: 3, BlockNumber: 5, ChunkOffset: 0}) - assert.Nil(t, err) - _, pos3, err := reader3.Next() - assert.Nil(t, err) - assert.Equal(t, pos3.SegmentId, uint32(3)) - assert.Equal(t, pos3.BlockNumber, uint32(5)) } func TestWAL_RenameFileExt(t *testing.T) { - dir, _ := os.MkdirTemp("", "wal-test-rename-ext") - opts := Options{ - DirPath: dir, - SegmentFileExt: ".VLOG.1.temp", - SegmentSize: 8 * 1024 * 1024, - BlockCache: 32 * KB * 10, + tests := []struct { + compressor CompressorType + }{ + { + None, + }, + { + Snappy, + }, + { + Zstd, + }, } - wal, err := Open(opts) - assert.Nil(t, err) - defer destroyWAL(wal) - testWriteAndIterate(t, wal, 20000, 512) + for _, tt := range tests { + dir, _ := os.MkdirTemp("", "wal-test-rename-ext") + opts := Options{ + DirPath: dir, + SegmentFileExt: ".VLOG.1.temp", + SegmentSize: 8 * 1024 * 1024, + BlockCache: 32 * KB * 10, + Compressor: tt.compressor, + } + wal, err := Open(opts) + assert.Nil(t, err) + defer destroyWAL(wal) + testWriteAndIterate(t, wal, 20000, 512) - err = wal.Close() - assert.Nil(t, err) + err = wal.Close() + assert.Nil(t, err) - err = wal.RenameFileExt(".VLOG.1") - assert.Nil(t, err) + err = wal.RenameFileExt(".VLOG.1") + assert.Nil(t, err) - opts.SegmentFileExt = ".VLOG.1" - wal2, err := Open(opts) - assert.Nil(t, err) - defer func() { - _ = wal2.Close() - }() - for i := 0; i < 20000; i++ { - _, err = wal2.Write([]byte(strings.Repeat("W", 512))) + opts.SegmentFileExt = ".VLOG.1" + wal2, err := Open(opts) assert.Nil(t, err) + defer func() { + _ = wal2.Close() + }() + for i := 0; i < 20000; i++ { + _, err = wal2.Write([]byte(strings.Repeat("W", 512))) + assert.Nil(t, err) + } } + }