Skip to content

Update redis client v9 #10

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions examples/prefer_writer/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"context"
"fmt"
"sync"
"time"

rwlock "github.com/e-chip/redis-rwlock"
"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
)

const (
Expand All @@ -17,6 +18,7 @@ const (
)

type example struct {
ctx context.Context
locker rwlock.Locker
wg sync.WaitGroup
doneC chan struct{}
Expand All @@ -26,7 +28,7 @@ func (e *example) WriteSharedData(sharedData *int) {
e.wg.Add(1)
go func() {
for i := 0; i < writeIterations; i++ {
err := e.locker.Write(func() {
err := e.locker.Write(e.ctx, func() {
fmt.Printf("Writing...\n")
time.Sleep(writeDuration)
(*sharedData)++
Expand All @@ -51,7 +53,7 @@ func (e *example) ReadSharedData(sharedData *int) {
e.wg.Done()
return
default:
err := e.locker.Read(func() {
err := e.locker.Read(e.ctx, func() {
fmt.Printf("Read: %d\n", *sharedData)
})
if err != nil {
Expand All @@ -75,6 +77,7 @@ func main() {
DB: 9,
})
example = example{
ctx: context.Background(),
locker: rwlock.New(redisClient, "GLOBAL_LOCK", "READER_COUNT", "WRITER_INTENT", &rwlock.Options{}),
doneC: make(chan struct{}),
}
Expand Down
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
module github.com/e-chip/redis-rwlock

go 1.13
go 1.17

require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/gofrs/uuid v3.3.0+incompatible
github.com/redis/go-redis/v9 v9.2.1
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
32 changes: 30 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/ginkgo/v2 v2.5.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/gomega v1.20.0/go.mod h1:JifAceMQ4crZIWYUKrlGcmbN3bqHogVTADMD2ATsbwk=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84=
github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
7 changes: 4 additions & 3 deletions pkg/rwlock/rwlock.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package rwlock

import (
"context"
"errors"
"strconv"
"time"

"github.com/go-redis/redis"
"github.com/gofrs/uuid"
"github.com/redis/go-redis/v9"
)

var (
Expand All @@ -25,9 +26,9 @@ var (
// Locker allows to execute given functions at reader or writer access privilege.
type Locker interface {
// Read executes given function with shared reader access.
Read(fn func()) error
Read(ctx context.Context, fn func()) error
// Write executes given function with unique writer access.
Write(fn func()) error
Write(ctx context.Context, fn func()) error
}

// New instance of RW-Locker.
Expand Down
57 changes: 29 additions & 28 deletions pkg/rwlock/rwlock_impl.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package rwlock

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
)

type lockerImpl struct {
Expand All @@ -20,30 +21,30 @@ type lockerImpl struct {
lockTTL string
}

func (l *lockerImpl) Read(fn func()) error {
return l.do(fn, l.acquireReader, l.refreshReader, l.releaseReader)
func (l *lockerImpl) Read(ctx context.Context, fn func()) error {
return l.do(ctx, fn, l.acquireReader, l.refreshReader, l.releaseReader)
}

func (l *lockerImpl) Write(fn func()) error {
return l.do(fn, l.acquireWriter, l.refreshWriter, l.releaseWriter)
func (l *lockerImpl) Write(ctx context.Context, fn func()) error {
return l.do(ctx, fn, l.acquireWriter, l.refreshWriter, l.releaseWriter)
}

func (l *lockerImpl) do(fn func(), acquire func() (bool, error), refresh func() (bool, error), release func() (bool, error)) error {
if l.redisClient.Ping().Err() != nil {
func (l *lockerImpl) do(ctx context.Context, fn func(), acquire func(ctx context.Context) (bool, error), refresh func(ctx context.Context) (bool, error), release func(ctx context.Context) (bool, error)) error {
if l.redisClient.Ping(ctx).Err() != nil {
return ErrConnection
}
stopRefreshing := make(chan struct{})
acquired, err := l.execute(acquire, l.options.RetryCount)
acquired, err := l.execute(ctx, acquire, l.options.RetryCount)
if err != nil {
return err
}
if !acquired {
return ErrTimeout
}
go l.keepRefreshing(refresh, stopRefreshing)
go l.keepRefreshing(ctx, refresh, stopRefreshing)
fnErr := l.runFn(fn)
stopRefreshing <- struct{}{}
released, err := release()
released, err := release(ctx)
if fnErr != nil {
return fnErr
}
Expand Down Expand Up @@ -75,9 +76,9 @@ func (l *lockerImpl) runFn(fn func()) (err error) {
return
}

func (l *lockerImpl) execute(fn func() (bool, error), attempts int) (bool, error) {
func (l *lockerImpl) execute(ctx context.Context, fn func(ctx context.Context) (bool, error), attempts int) (bool, error) {
for i := 0; i < attempts; i++ {
if ok, err := fn(); err != nil {
if ok, err := fn(ctx); err != nil {
return false, err
} else if ok {
return true, nil
Expand All @@ -98,7 +99,7 @@ func (l *lockerImpl) wait(d time.Duration) error {
}
}

func (l *lockerImpl) keepRefreshing(refresh func() (bool, error), stop chan struct{}) {
func (l *lockerImpl) keepRefreshing(ctx context.Context, refresh func(ctx context.Context) (bool, error), stop chan struct{}) {
timeout := l.options.LockTTL / 2
timer := time.NewTicker(timeout)
defer timer.Stop()
Expand All @@ -110,12 +111,12 @@ func (l *lockerImpl) keepRefreshing(refresh func() (bool, error), stop chan stru
case <-l.options.Context.Done():
return
case <-timer.C:
refresh()
refresh(ctx)
}
}
}

func (l *lockerImpl) acquireReader() (bool, error) {
func (l *lockerImpl) acquireReader(ctx context.Context) (bool, error) {
var preferWriter = 0
switch l.options.Mode {
case ModePreferWriter:
Expand All @@ -125,48 +126,48 @@ func (l *lockerImpl) acquireReader() (bool, error) {
default:
return false, ErrUnknownMode
}
return l.execScript(acquireReadLock, []string{
return l.execScript(ctx, acquireReadLock, []string{
l.keyGlobalLock,
l.keyReadersCount,
l.keyWriterIntent,
}, l.options.ReaderLockToken, l.lockTTL, preferWriter)
}

func (l *lockerImpl) releaseReader() (bool, error) {
return l.execScript(releaseReadLock, []string{
func (l *lockerImpl) releaseReader(ctx context.Context) (bool, error) {
return l.execScript(ctx, releaseReadLock, []string{
l.keyGlobalLock,
l.keyReadersCount,
}, l.options.ReaderLockToken)
}

func (l *lockerImpl) refreshReader() (bool, error) {
return l.execScript(refreshLock, []string{
func (l *lockerImpl) refreshReader(ctx context.Context) (bool, error) {
return l.execScript(ctx, refreshLock, []string{
l.keyGlobalLock,
}, l.options.ReaderLockToken, l.lockTTL)
}

func (l *lockerImpl) acquireWriter() (bool, error) {
return l.execScript(acquireWriteLock, []string{
func (l *lockerImpl) acquireWriter(ctx context.Context) (bool, error) {
return l.execScript(ctx, acquireWriteLock, []string{
l.keyGlobalLock,
l.keyReadersCount,
l.keyWriterIntent,
}, l.writerToken, l.lockTTL)
}

func (l *lockerImpl) releaseWriter() (bool, error) {
return l.execScript(releaseWriteLock, []string{
func (l *lockerImpl) releaseWriter(ctx context.Context) (bool, error) {
return l.execScript(ctx, releaseWriteLock, []string{
l.keyGlobalLock,
}, l.writerToken)
}

func (l *lockerImpl) refreshWriter() (bool, error) {
return l.execScript(refreshLock, []string{
func (l *lockerImpl) refreshWriter(ctx context.Context) (bool, error) {
return l.execScript(ctx, refreshLock, []string{
l.keyGlobalLock,
}, l.writerToken, l.lockTTL)
}

func (l *lockerImpl) execScript(script *redis.Script, keys []string, args ...interface{}) (bool, error) {
status, err := script.Run(l.redisClient, keys, args...).Result()
func (l *lockerImpl) execScript(ctx context.Context, script *redis.Script, keys []string, args ...interface{}) (bool, error) {
status, err := script.Run(ctx, l.redisClient, keys, args...).Result()
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rwlock/rwlock_scripts.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package rwlock

import (
"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion rwlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package rwlock

import (
"github.com/e-chip/redis-rwlock/pkg/rwlock"
"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
)

// Locker is an alias type to #rwlock.Locker
Expand Down