diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f1c927 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea/ +.vscode/ +.DS_Store diff --git a/README.md b/README.md index e2198e0..a2fe367 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Write Ahead Log for LSM or bitcask storage, with block cache. ## Design Overview -![](https://img-blog.csdnimg.cn/3910507c20a04f9190c3664e3657a4b1.png#pic_center) +![wal-logo.png](https://s2.loli.net/2024/05/19/Iz4JML2SokFdrfN.png) ## Format diff --git a/go.mod b/go.mod index 53166d7..dd7387a 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/rosedblabs/wal -go 1.19 +go 1.21 require ( - github.com/stretchr/testify v1.8.3 + github.com/stretchr/testify v1.9.0 github.com/valyala/bytebufferpool v1.0.0 ) diff --git a/go.sum b/go.sum index 7b72037..b5da196 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= -github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= -github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/segment.go b/segment.go index 68d59b1..a7e0db7 100644 --- a/segment.go +++ b/segment.go @@ -4,12 +4,10 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/valyala/bytebufferpool" "hash/crc32" "io" "os" - "sync" - - "github.com/valyala/bytebufferpool" ) type ChunkType = byte @@ -53,7 +51,7 @@ type segment struct { currentBlockSize uint32 closed bool header []byte - blockPool sync.Pool + cachedBlock *blockAndHeader } // segmentReader is used to iterate all the data from the segment file. @@ -67,8 +65,9 @@ type segmentReader struct { // block and chunk header, saved in pool. type blockAndHeader struct { - block []byte - header []byte + block []byte + header []byte + blockNumber int64 } // ChunkPosition represents the position of a chunk in a segment file. @@ -101,23 +100,23 @@ func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) { panic(fmt.Errorf("seek to the end of segment file %d%s failed: %v", id, extName, err)) } + // init cached block + bh := &blockAndHeader{ + block: make([]byte, blockSize), + header: make([]byte, chunkHeaderSize), + blockNumber: -1, + } + return &segment{ id: id, fd: fd, header: make([]byte, chunkHeaderSize), - blockPool: sync.Pool{New: newBlockAndHeader}, currentBlockNumber: uint32(offset / blockSize), currentBlockSize: uint32(offset % blockSize), + cachedBlock: bh, }, nil } -func newBlockAndHeader() interface{} { - return &blockAndHeader{ - block: make([]byte, blockSize), - header: make([]byte, chunkHeaderSize), - } -} - // NewReader creates a new segment reader. // You can call Next to get the next chunk data, // and io.EOF will be returned when there is no data. @@ -356,6 +355,8 @@ func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error { return err } + // the cached block can not be reused again after writes. + seg.cachedBlock.blockNumber = -1 return nil } @@ -372,13 +373,10 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, var ( result []byte - bh = seg.blockPool.Get().(*blockAndHeader) + bh = seg.cachedBlock segSize = seg.Size() nextChunk = &ChunkPosition{SegmentId: seg.id} ) - defer func() { - seg.blockPool.Put(bh) - }() for { size := int64(blockSize) @@ -391,10 +389,18 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, return nil, nil, io.EOF } - // cache miss, read block from the segment file - _, err := seg.fd.ReadAt(bh.block[0:size], offset) - if err != nil { - return nil, nil, err + // There are two cases that we should read block from file: + // 1. the acquired block is not the cached one + // 2. new writes appended to the block, and the block + // is still smaller than 32KB, we must read it again because of the new writes. + if seg.cachedBlock.blockNumber != int64(blockNumber) || size != blockSize { + // read block from segment file at the specified offset. + _, err := seg.fd.ReadAt(bh.block[0:size], offset) + if err != nil { + return nil, nil, err + } + // remember the block + bh.blockNumber = int64(blockNumber) } // header diff --git a/wal.go b/wal.go index 359adc3..a8bd55c 100644 --- a/wal.go +++ b/wal.go @@ -30,10 +30,6 @@ var ( // // The mu sync.RWMutex is used for concurrent access to the WAL data structure, // ensuring safe access and modification. -// -// The blockCache is an LRU cache used to store recently accessed data blocks, -// improving read performance by reducing disk I/O. -// It is implemented using a lru.Cache structure with keys of type uint64 and values of type []byte. type WAL struct { activeSegment *segment // active segment file, used for new incoming writes. olderSegments map[SegmentID]*segment // older segment files, only used for read.