diff --git a/examples/prefer_writer/main.go b/examples/prefer_writer/main.go index 7e8b4bf..0f5a95a 100644 --- a/examples/prefer_writer/main.go +++ b/examples/prefer_writer/main.go @@ -1,12 +1,13 @@ package main import ( + "context" "fmt" "sync" "time" rwlock "github.com/e-chip/redis-rwlock" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" ) const ( @@ -17,6 +18,7 @@ const ( ) type example struct { + ctx context.Context locker rwlock.Locker wg sync.WaitGroup doneC chan struct{} @@ -26,7 +28,7 @@ func (e *example) WriteSharedData(sharedData *int) { e.wg.Add(1) go func() { for i := 0; i < writeIterations; i++ { - err := e.locker.Write(func() { + err := e.locker.Write(e.ctx, func() { fmt.Printf("Writing...\n") time.Sleep(writeDuration) (*sharedData)++ @@ -51,7 +53,7 @@ func (e *example) ReadSharedData(sharedData *int) { e.wg.Done() return default: - err := e.locker.Read(func() { + err := e.locker.Read(e.ctx, func() { fmt.Printf("Read: %d\n", *sharedData) }) if err != nil { @@ -75,6 +77,7 @@ func main() { DB: 9, }) example = example{ + ctx: context.Background(), locker: rwlock.New(redisClient, "GLOBAL_LOCK", "READER_COUNT", "WRITER_INTENT", &rwlock.Options{}), doneC: make(chan struct{}), } diff --git a/go.mod b/go.mod index b69766f..fd5d739 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,13 @@ module github.com/e-chip/redis-rwlock -go 1.13 +go 1.17 require ( - github.com/go-redis/redis v6.15.9+incompatible github.com/gofrs/uuid v3.3.0+incompatible + github.com/redis/go-redis/v9 v9.2.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go.sum b/go.sum index 37e8796..d68c4f9 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,32 @@ -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ= +github.com/bsm/ginkgo/v2 v2.5.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8= +github.com/bsm/gomega v1.20.0/go.mod h1:JifAceMQ4crZIWYUKrlGcmbN3bqHogVTADMD2ATsbwk= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= +github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/rwlock/rwlock.go b/pkg/rwlock/rwlock.go index 6995bec..5ba4d65 100644 --- a/pkg/rwlock/rwlock.go +++ b/pkg/rwlock/rwlock.go @@ -1,12 +1,13 @@ package rwlock import ( + "context" "errors" "strconv" "time" - "github.com/go-redis/redis" "github.com/gofrs/uuid" + "github.com/redis/go-redis/v9" ) var ( @@ -25,9 +26,9 @@ var ( // Locker allows to execute given functions at reader or writer access privilege. type Locker interface { // Read executes given function with shared reader access. - Read(fn func()) error + Read(ctx context.Context, fn func()) error // Write executes given function with unique writer access. - Write(fn func()) error + Write(ctx context.Context, fn func()) error } // New instance of RW-Locker. diff --git a/pkg/rwlock/rwlock_impl.go b/pkg/rwlock/rwlock_impl.go index ccb5092..5ace376 100644 --- a/pkg/rwlock/rwlock_impl.go +++ b/pkg/rwlock/rwlock_impl.go @@ -1,11 +1,12 @@ package rwlock import ( + "context" "errors" "fmt" "time" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" ) type lockerImpl struct { @@ -20,30 +21,30 @@ type lockerImpl struct { lockTTL string } -func (l *lockerImpl) Read(fn func()) error { - return l.do(fn, l.acquireReader, l.refreshReader, l.releaseReader) +func (l *lockerImpl) Read(ctx context.Context, fn func()) error { + return l.do(ctx, fn, l.acquireReader, l.refreshReader, l.releaseReader) } -func (l *lockerImpl) Write(fn func()) error { - return l.do(fn, l.acquireWriter, l.refreshWriter, l.releaseWriter) +func (l *lockerImpl) Write(ctx context.Context, fn func()) error { + return l.do(ctx, fn, l.acquireWriter, l.refreshWriter, l.releaseWriter) } -func (l *lockerImpl) do(fn func(), acquire func() (bool, error), refresh func() (bool, error), release func() (bool, error)) error { - if l.redisClient.Ping().Err() != nil { +func (l *lockerImpl) do(ctx context.Context, fn func(), acquire func(ctx context.Context) (bool, error), refresh func(ctx context.Context) (bool, error), release func(ctx context.Context) (bool, error)) error { + if l.redisClient.Ping(ctx).Err() != nil { return ErrConnection } stopRefreshing := make(chan struct{}) - acquired, err := l.execute(acquire, l.options.RetryCount) + acquired, err := l.execute(ctx, acquire, l.options.RetryCount) if err != nil { return err } if !acquired { return ErrTimeout } - go l.keepRefreshing(refresh, stopRefreshing) + go l.keepRefreshing(ctx, refresh, stopRefreshing) fnErr := l.runFn(fn) stopRefreshing <- struct{}{} - released, err := release() + released, err := release(ctx) if fnErr != nil { return fnErr } @@ -75,9 +76,9 @@ func (l *lockerImpl) runFn(fn func()) (err error) { return } -func (l *lockerImpl) execute(fn func() (bool, error), attempts int) (bool, error) { +func (l *lockerImpl) execute(ctx context.Context, fn func(ctx context.Context) (bool, error), attempts int) (bool, error) { for i := 0; i < attempts; i++ { - if ok, err := fn(); err != nil { + if ok, err := fn(ctx); err != nil { return false, err } else if ok { return true, nil @@ -98,7 +99,7 @@ func (l *lockerImpl) wait(d time.Duration) error { } } -func (l *lockerImpl) keepRefreshing(refresh func() (bool, error), stop chan struct{}) { +func (l *lockerImpl) keepRefreshing(ctx context.Context, refresh func(ctx context.Context) (bool, error), stop chan struct{}) { timeout := l.options.LockTTL / 2 timer := time.NewTicker(timeout) defer timer.Stop() @@ -110,12 +111,12 @@ func (l *lockerImpl) keepRefreshing(refresh func() (bool, error), stop chan stru case <-l.options.Context.Done(): return case <-timer.C: - refresh() + refresh(ctx) } } } -func (l *lockerImpl) acquireReader() (bool, error) { +func (l *lockerImpl) acquireReader(ctx context.Context) (bool, error) { var preferWriter = 0 switch l.options.Mode { case ModePreferWriter: @@ -125,48 +126,48 @@ func (l *lockerImpl) acquireReader() (bool, error) { default: return false, ErrUnknownMode } - return l.execScript(acquireReadLock, []string{ + return l.execScript(ctx, acquireReadLock, []string{ l.keyGlobalLock, l.keyReadersCount, l.keyWriterIntent, }, l.options.ReaderLockToken, l.lockTTL, preferWriter) } -func (l *lockerImpl) releaseReader() (bool, error) { - return l.execScript(releaseReadLock, []string{ +func (l *lockerImpl) releaseReader(ctx context.Context) (bool, error) { + return l.execScript(ctx, releaseReadLock, []string{ l.keyGlobalLock, l.keyReadersCount, }, l.options.ReaderLockToken) } -func (l *lockerImpl) refreshReader() (bool, error) { - return l.execScript(refreshLock, []string{ +func (l *lockerImpl) refreshReader(ctx context.Context) (bool, error) { + return l.execScript(ctx, refreshLock, []string{ l.keyGlobalLock, }, l.options.ReaderLockToken, l.lockTTL) } -func (l *lockerImpl) acquireWriter() (bool, error) { - return l.execScript(acquireWriteLock, []string{ +func (l *lockerImpl) acquireWriter(ctx context.Context) (bool, error) { + return l.execScript(ctx, acquireWriteLock, []string{ l.keyGlobalLock, l.keyReadersCount, l.keyWriterIntent, }, l.writerToken, l.lockTTL) } -func (l *lockerImpl) releaseWriter() (bool, error) { - return l.execScript(releaseWriteLock, []string{ +func (l *lockerImpl) releaseWriter(ctx context.Context) (bool, error) { + return l.execScript(ctx, releaseWriteLock, []string{ l.keyGlobalLock, }, l.writerToken) } -func (l *lockerImpl) refreshWriter() (bool, error) { - return l.execScript(refreshLock, []string{ +func (l *lockerImpl) refreshWriter(ctx context.Context) (bool, error) { + return l.execScript(ctx, refreshLock, []string{ l.keyGlobalLock, }, l.writerToken, l.lockTTL) } -func (l *lockerImpl) execScript(script *redis.Script, keys []string, args ...interface{}) (bool, error) { - status, err := script.Run(l.redisClient, keys, args...).Result() +func (l *lockerImpl) execScript(ctx context.Context, script *redis.Script, keys []string, args ...interface{}) (bool, error) { + status, err := script.Run(ctx, l.redisClient, keys, args...).Result() if err != nil { return false, err } diff --git a/pkg/rwlock/rwlock_scripts.go b/pkg/rwlock/rwlock_scripts.go index e693042..96ba23d 100644 --- a/pkg/rwlock/rwlock_scripts.go +++ b/pkg/rwlock/rwlock_scripts.go @@ -1,7 +1,7 @@ package rwlock import ( - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" ) var ( diff --git a/rwlock.go b/rwlock.go index afaed34..df39089 100644 --- a/rwlock.go +++ b/rwlock.go @@ -4,7 +4,7 @@ package rwlock import ( "github.com/e-chip/redis-rwlock/pkg/rwlock" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" ) // Locker is an alias type to #rwlock.Locker