From f18e61333e7a59533efaae271710cc32080920f0 Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Mon, 27 Jan 2025 21:42:03 +0900 Subject: [PATCH 1/8] feat: add connection lifetime option to single client --- mux_test.go | 4 ++++ pipe.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++- pool.go | 2 ++ rueidis.go | 9 +++++++++ 4 files changed, 62 insertions(+), 1 deletion(-) diff --git a/mux_test.go b/mux_test.go index cddcfa29..72a30b18 100644 --- a/mux_test.go +++ b/mux_test.go @@ -1135,6 +1135,10 @@ func (m *mockWire) SetOnCloseHook(fn func(error)) { } } +func (m *mockWire) StopTimer() {} + +func (m *mockWire) ResetTimer() {} + func (m *mockWire) Info() map[string]RedisMessage { if m.InfoFn != nil { return m.InfoFn() diff --git a/pipe.go b/pipe.go index 219ef4be..c7428584 100644 --- a/pipe.go +++ b/pipe.go @@ -55,6 +55,8 @@ type wire interface { CleanSubscriptions() SetPubSubHooks(hooks PubSubHooks) <-chan error SetOnCloseHook(fn func(error)) + StopTimer() + ResetTimer() } var _ wire = (*pipe)(nil) @@ -89,6 +91,10 @@ type pipe struct { recvs int32 r2ps bool // identify this pipe is used for resp2 pubsub or not noNoDelay bool + lftm time.Duration // lifetime + lftmMu sync.Mutex // guards lifetime timer + lftmOn bool + lftmTimer *time.Timer // lifetime timer } type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) @@ -326,6 +332,10 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg go p.backgroundPing() } } + if option.ConnLifetime > 0 { + p.lftm = option.ConnLifetime + p.lftmTimer = time.AfterFunc(option.ConnLifetime, p.expired) + } return p, nil } @@ -373,6 +383,7 @@ func (p *pipe) _background() { }() } } + p.StopTimer() err := p.Error() p.nsubs.Close() p.psubs.Close() @@ -1576,6 +1587,7 @@ func (p *pipe) Close() { } atomic.AddInt32(&p.waits, -1) atomic.AddInt32(&p.blcksig, -1) + p.StopTimer() if p.conn != nil { p.conn.Close() } @@ -1586,6 +1598,37 @@ func (p *pipe) Close() { p.r2mu.Unlock() } +func (p *pipe) StopTimer() { + if p.lftmTimer == nil { + return + } + p.lftmMu.Lock() + defer p.lftmMu.Unlock() + if !p.lftmOn { + return + } + p.lftmOn = false + p.lftmTimer.Stop() +} + +func (p *pipe) ResetTimer() { + if p.lftmTimer == nil || p.Error() != nil { + return + } + p.lftmMu.Lock() + defer p.lftmMu.Unlock() + if p.lftmOn { + return + } + p.lftmOn = true + p.lftmTimer.Reset(p.lftm) +} + +func (p *pipe) expired() { + p.error.CompareAndSwap(nil, errExpired) + p.Close() +} + type pshks struct { hooks PubSubHooks close chan error @@ -1625,6 +1668,9 @@ const ( ) var cacheMark = &(RedisMessage{}) -var errClosing = &errs{error: ErrClosing} +var ( + errClosing = &errs{error: ErrClosing} + errExpired = &errs{error: errConnExpired} +) type errs struct{ error } diff --git a/pool.go b/pool.go index c3e07d88..7273b369 100644 --- a/pool.go +++ b/pool.go @@ -59,6 +59,7 @@ retry: } } p.cond.L.Unlock() + v.StopTimer() return v } @@ -67,6 +68,7 @@ func (p *pool) Store(v wire) { if !p.down && v.Error() == nil { p.list = append(p.list, v) p.startTimerIfNeeded() + v.ResetTimer() } else { p.size-- v.Close() diff --git a/rueidis.go b/rueidis.go index 57d8dcbd..9e3ff260 100644 --- a/rueidis.go +++ b/rueidis.go @@ -161,6 +161,10 @@ type ClientOption struct { // This default is ClientOption.Dialer.KeepAlive * (9+1), where 9 is the default of tcp_keepalive_probes on Linux. ConnWriteTimeout time.Duration + // ConnLiftime is lifetime for each connection. If specified, + // connections will close after passing lifetime. Note that the connection which dedicated client and blocking use is not closed. + ConnLifetime time.Duration + // MaxFlushDelay when greater than zero pauses pipeline write loop for some time (not larger than MaxFlushDelay) // after each flushing of data to the connection. This gives pipeline a chance to collect more commands to send // to Redis. Adding this delay increases latency, reduces throughput – but in most cases may significantly reduce @@ -464,3 +468,8 @@ func dial(dst string, opt *ClientOption) (conn net.Conn, err error) { } const redisErrMsgCommandNotAllow = "command is not allowed" + +var ( + // errConnExpired means wrong connection that ClientOption.ConnLifetime had passed since connecting + errConnExpired = errors.New("connection is expired") +) From f950c1e7294962a119774ffb33f4b6271278d932 Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Sat, 8 Feb 2025 18:40:31 +0900 Subject: [PATCH 2/8] Remove mutex and timer flag for connection lifetime timer --- pipe.go | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/pipe.go b/pipe.go index c7428584..7284bb3a 100644 --- a/pipe.go +++ b/pipe.go @@ -55,8 +55,8 @@ type wire interface { CleanSubscriptions() SetPubSubHooks(hooks PubSubHooks) <-chan error SetOnCloseHook(fn func(error)) - StopTimer() - ResetTimer() + StopTimer() bool + ResetTimer() bool } var _ wire = (*pipe)(nil) @@ -92,9 +92,7 @@ type pipe struct { r2ps bool // identify this pipe is used for resp2 pubsub or not noNoDelay bool lftm time.Duration // lifetime - lftmMu sync.Mutex // guards lifetime timer - lftmOn bool - lftmTimer *time.Timer // lifetime timer + lftmTimer *time.Timer // lifetime timer } type pipeFn func(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) @@ -350,6 +348,7 @@ func (p *pipe) _exit(err error) { p.error.CompareAndSwap(nil, &errs{error: err}) atomic.CompareAndSwapInt32(&p.state, 1, 2) // stop accepting new requests _ = p.conn.Close() // force both read & write goroutine to exit + p.StopTimer() p.clhks.Load().(func(error))(err) } @@ -383,7 +382,6 @@ func (p *pipe) _background() { }() } } - p.StopTimer() err := p.Error() p.nsubs.Close() p.psubs.Close() @@ -1598,30 +1596,18 @@ func (p *pipe) Close() { p.r2mu.Unlock() } -func (p *pipe) StopTimer() { +func (p *pipe) StopTimer() bool { if p.lftmTimer == nil { - return - } - p.lftmMu.Lock() - defer p.lftmMu.Unlock() - if !p.lftmOn { - return + return true } - p.lftmOn = false - p.lftmTimer.Stop() + return p.lftmTimer.Stop() } -func (p *pipe) ResetTimer() { +func (p *pipe) ResetTimer() bool { if p.lftmTimer == nil || p.Error() != nil { - return - } - p.lftmMu.Lock() - defer p.lftmMu.Unlock() - if p.lftmOn { - return + return true } - p.lftmOn = true - p.lftmTimer.Reset(p.lftm) + return p.lftmTimer.Reset(p.lftm) } func (p *pipe) expired() { From 390e19b48220265d50a59d22624e8280b55b009b Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Sat, 8 Feb 2025 18:41:18 +0900 Subject: [PATCH 3/8] Retry wire accquition when failed to stop connection lifetime timer --- pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index 7273b369..63808a1a 100644 --- a/pool.go +++ b/pool.go @@ -47,19 +47,19 @@ retry: } else if len(p.list) == 0 { p.size++ v = p.make() + v.StopTimer() } else { i := len(p.list) - 1 v = p.list[i] p.list[i] = nil p.list = p.list[:i] - if v.Error() != nil { + if !v.StopTimer() || v.Error() != nil { p.size-- v.Close() goto retry } } p.cond.L.Unlock() - v.StopTimer() return v } From 88c8d7e6b64329f2170343c60794f12a92c831ee Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Sat, 8 Feb 2025 18:44:28 +0900 Subject: [PATCH 4/8] Add timer test to pipe --- mux_test.go | 4 ++-- pipe_test.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/mux_test.go b/mux_test.go index 72a30b18..d4fc76c8 100644 --- a/mux_test.go +++ b/mux_test.go @@ -1135,9 +1135,9 @@ func (m *mockWire) SetOnCloseHook(fn func(error)) { } } -func (m *mockWire) StopTimer() {} +func (m *mockWire) StopTimer() bool { return true } -func (m *mockWire) ResetTimer() {} +func (m *mockWire) ResetTimer() bool { return true } func (m *mockWire) Info() map[string]RedisMessage { if m.InfoFn != nil { diff --git a/pipe_test.go b/pipe_test.go index 6e187c6e..70b1232e 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -2586,6 +2586,66 @@ func TestOnInvalidations(t *testing.T) { } } +func TestConnLifetime(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + + t.Run("Enabled ConnLifetime", func(t *testing.T) { + p, _, _, closeConn := setup(t, ClientOption{ + ConnLifetime: 50 * time.Millisecond, + }) + defer closeConn() + + if p.Error() != nil { + t.Fatalf("unexpected error %v", p.Error()) + } + time.Sleep(60 * time.Millisecond) + if p.Error() != errConnExpired { + t.Fatalf("unexpected error, expected: %v, got: %v", errConnExpired, p.Error()) + } + }) + + t.Run("Disabled ConnLifetime", func(t *testing.T) { + p, _, _, closeConn := setup(t, ClientOption{}) + defer closeConn() + + time.Sleep(60 * time.Millisecond) + if p.Error() != nil { + t.Fatalf("unexpected error %v", p.Error()) + } + }) + + t.Run("StopTimer", func(t *testing.T) { + p, _, _, closeConn := setup(t, ClientOption{ + ConnLifetime: 50 * time.Millisecond, + }) + defer closeConn() + + p.StopTimer() + time.Sleep(60 * time.Millisecond) + if p.Error() != nil { + t.Fatalf("unexpected error %v", p.Error()) + } + }) + + t.Run("ResetTimer", func(t *testing.T) { + p, _, _, closeConn := setup(t, ClientOption{ + ConnLifetime: 50 * time.Millisecond, + }) + defer closeConn() + + time.Sleep(20 * time.Millisecond) + p.ResetTimer() + time.Sleep(40 * time.Millisecond) + if p.Error() != nil { + t.Fatalf("unexpected error %v", p.Error()) + } + time.Sleep(20 * time.Millisecond) + if p.Error() != errConnExpired { + t.Fatalf("unexpected error, expected: %v, got: %v", errConnExpired, p.Error()) + } + }) +} + func TestMultiHalfErr(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) p, mock, _, closeConn := setup(t, ClientOption{}) From 14349d45d906eee8df1501ce2e9865a0ea83beb3 Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Sat, 8 Feb 2025 22:57:40 +0900 Subject: [PATCH 5/8] Add test for reseting timer and stopping timer when using pool --- mux_test.go | 16 +++++++++-- pool_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/mux_test.go b/mux_test.go index d4fc76c8..5ee0f4a0 100644 --- a/mux_test.go +++ b/mux_test.go @@ -1061,6 +1061,8 @@ type mockWire struct { VersionFn func() int ErrorFn func() error CloseFn func() + StopTimerFn func() bool + ResetTimerFn func() bool CleanSubscriptionsFn func() SetPubSubHooksFn func(hooks PubSubHooks) <-chan error @@ -1135,9 +1137,19 @@ func (m *mockWire) SetOnCloseHook(fn func(error)) { } } -func (m *mockWire) StopTimer() bool { return true } +func (m *mockWire) StopTimer() bool { + if m.StopTimerFn != nil { + return m.StopTimerFn() + } + return true +} -func (m *mockWire) ResetTimer() bool { return true } +func (m *mockWire) ResetTimer() bool { + if m.ResetTimerFn != nil { + return m.ResetTimerFn() + } + return true +} func (m *mockWire) Info() map[string]RedisMessage { if m.InfoFn != nil { diff --git a/pool_test.go b/pool_test.go index f0cb986f..563a0d1d 100644 --- a/pool_test.go +++ b/pool_test.go @@ -329,3 +329,80 @@ func TestPoolWithIdleTTL(t *testing.T) { p.Close() }) } + +func TestPoolWithConnLifetime(t *testing.T) { + defer ShouldNotLeaked(SetupLeakDetection()) + setup := func(wires []wire) *pool { + var count int32 + return newPool(len(wires), dead, 0, 0, func() wire { + idx := atomic.AddInt32(&count, 1) - 1 + return wires[idx] + }) + } + + t.Run("Reuse without expired connections", func(t *testing.T) { + stopTimerCall := 0 + wires := []wire{ + &mockWire{}, + &mockWire{ + StopTimerFn: func() bool { + stopTimerCall++ + return false + }, // connection lifetime timer is already fired + }, + } + conn := make([]wire, 0, len(wires)) + pool := setup(wires) + for i := 0; i < len(wires); i++ { + conn = append(conn, pool.Acquire()) + } + for i := 0; i < len(conn); i++ { + pool.Store(conn[i]) + } + + if stopTimerCall != 1 { + t.Errorf("StopTimer must be called when making wire") + } + + pool.cond.L.Lock() + if pool.size != 2 { + t.Errorf("size must be equal to 2, actual: %d", pool.size) + } + if len(pool.list) != 2 { + t.Errorf("list len must equal to 2, actual: %d", len(pool.list)) + } + pool.cond.L.Unlock() + + // stop timer failed, so drop the expired connection + pool.Store(pool.Acquire()) + + if stopTimerCall != 2 { + t.Errorf("StopTimer must be called when acquiring from pool") + } + + pool.cond.L.Lock() + if pool.size != 1 { + t.Errorf("size must be equal to 1, actual: %d", pool.size) + } + if len(pool.list) != 1 { + t.Errorf("list len must equal to 1, actual: %d", len(pool.list)) + } + pool.cond.L.Unlock() + }) + + t.Run("Reset timer when storing to pool", func(t *testing.T) { + call := false + w := &mockWire{ + ResetTimerFn: func() bool { + call = true + return true + }, + } + pool := setup([]wire{w}) + pool.Store(pool.Acquire()) + + if !call { + t.Error("ResetTimer must be called when storing") + } + }) +} From e91d31650d77b9cb1a56a120aa6e64fb6749c938 Mon Sep 17 00:00:00 2001 From: Terunori Togo <603380+terut@users.noreply.github.com> Date: Mon, 10 Feb 2025 14:55:28 +0900 Subject: [PATCH 6/8] Remove p.StopTimer() from p.Close() Co-authored-by: Rueian --- pipe.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pipe.go b/pipe.go index 7284bb3a..dea0c8bc 100644 --- a/pipe.go +++ b/pipe.go @@ -1585,7 +1585,6 @@ func (p *pipe) Close() { } atomic.AddInt32(&p.waits, -1) atomic.AddInt32(&p.blcksig, -1) - p.StopTimer() if p.conn != nil { p.conn.Close() } From 9fba892ba0a95d8a40e0c7f756d2dd007e859da2 Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Mon, 10 Feb 2025 16:23:36 +0900 Subject: [PATCH 7/8] Forced to retry when errConnExpired --- client.go | 30 +++++++++++++++++++++++++++--- cluster.go | 2 +- sentinel.go | 2 +- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index d3159a83..afd95320 100644 --- a/client.go +++ b/client.go @@ -15,6 +15,7 @@ type singleClient struct { retryHandler retryHandler stop uint32 retry bool + hasConnLftm bool DisableCache bool } @@ -32,11 +33,11 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn, retryer retryH if err := conn.Dial(); err != nil { return nil, err } - return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer), nil + return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, opt.ConnLifetime > 0, retryer), nil } -func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool, retryer retryHandler) *singleClient { - return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, DisableCache: disableCache} +func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache, hasConnLftm bool, retryer retryHandler) *singleClient { + return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, hasConnLftm: hasConnLftm, DisableCache: disableCache} } func (c *singleClient) B() Builder { @@ -47,6 +48,9 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) attempts := 1 retry: resp = c.conn.Do(ctx, cmd) + if c.hasConnLftm && resp.Error() == errConnExpired { + goto retry + } if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry( ctx, attempts, cmd, resp.Error(), @@ -86,6 +90,13 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [ attempts := 1 retry: resps = c.conn.DoMulti(ctx, multi...).s + if c.hasConnLftm { + for _, resp := range resps { + if resp.Error() == errConnExpired { + goto retry + } + } + } if c.retry && allReadOnly(multi) { for i, resp := range resps { if c.isRetryable(resp.Error(), ctx) { @@ -114,6 +125,13 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL) attempts := 1 retry: resps = c.conn.DoMultiCache(ctx, multi...).s + if c.hasConnLftm { + for _, resp := range resps { + if resp.Error() == errConnExpired { + goto retry + } + } + } if c.retry { for i, resp := range resps { if c.isRetryable(resp.Error(), ctx) { @@ -139,6 +157,9 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura attempts := 1 retry: resp = c.conn.DoCache(ctx, cmd, ttl) + if c.hasConnLftm && resp.Error() == errConnExpired { + goto retry + } if c.retry && c.isRetryable(resp.Error(), ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error()) if shouldRetry { @@ -156,6 +177,9 @@ func (c *singleClient) Receive(ctx context.Context, subscribe Completed, fn func attempts := 1 retry: err = c.conn.Receive(ctx, subscribe, fn) + if c.hasConnLftm && err == errConnExpired { + goto retry + } if c.retry { if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) { shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err) diff --git a/cluster.go b/cluster.go index 4cf04e88..e20502b3 100644 --- a/cluster.go +++ b/cluster.go @@ -1196,7 +1196,7 @@ func (c *clusterClient) Nodes() map[string]Client { disableCache := c.opt != nil && c.opt.DisableCache for addr, cc := range c.conns { if !cc.hidden { - _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler) + _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, false, c.retryHandler) } } c.mu.RUnlock() diff --git a/sentinel.go b/sentinel.go index 4518b187..9792b911 100644 --- a/sentinel.go +++ b/sentinel.go @@ -215,7 +215,7 @@ func (c *sentinelClient) Dedicate() (DedicatedClient, func()) { func (c *sentinelClient) Nodes() map[string]Client { conn := c.mConn.Load().(conn) disableCache := c.mOpt != nil && c.mOpt.DisableCache - return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache, c.retryHandler)} + return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache, false, c.retryHandler)} } func (c *sentinelClient) Close() { From c0c365740db9eeeae1d0833a475b1737078d8423 Mon Sep 17 00:00:00 2001 From: terut <603380+terut@users.noreply.github.com> Date: Mon, 10 Feb 2025 20:34:24 +0900 Subject: [PATCH 8/8] Remove hasConnLftm and check resps[0] to retry for multi cmds --- client.go | 29 ++++++++++------------------- cluster.go | 2 +- sentinel.go | 2 +- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index afd95320..abfe2929 100644 --- a/client.go +++ b/client.go @@ -15,7 +15,6 @@ type singleClient struct { retryHandler retryHandler stop uint32 retry bool - hasConnLftm bool DisableCache bool } @@ -33,11 +32,11 @@ func newSingleClient(opt *ClientOption, prev conn, connFn connFn, retryer retryH if err := conn.Dial(); err != nil { return nil, err } - return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, opt.ConnLifetime > 0, retryer), nil + return newSingleClientWithConn(conn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer), nil } -func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache, hasConnLftm bool, retryer retryHandler) *singleClient { - return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, hasConnLftm: hasConnLftm, DisableCache: disableCache} +func newSingleClientWithConn(conn conn, builder Builder, retry, disableCache bool, retryer retryHandler) *singleClient { + return &singleClient{cmd: builder, conn: conn, retry: retry, retryHandler: retryer, DisableCache: disableCache} } func (c *singleClient) B() Builder { @@ -48,7 +47,7 @@ func (c *singleClient) Do(ctx context.Context, cmd Completed) (resp RedisResult) attempts := 1 retry: resp = c.conn.Do(ctx, cmd) - if c.hasConnLftm && resp.Error() == errConnExpired { + if resp.Error() == errConnExpired { goto retry } if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) { @@ -90,12 +89,8 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [ attempts := 1 retry: resps = c.conn.DoMulti(ctx, multi...).s - if c.hasConnLftm { - for _, resp := range resps { - if resp.Error() == errConnExpired { - goto retry - } - } + if resps[0].Error() == errConnExpired { + goto retry } if c.retry && allReadOnly(multi) { for i, resp := range resps { @@ -125,12 +120,8 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL) attempts := 1 retry: resps = c.conn.DoMultiCache(ctx, multi...).s - if c.hasConnLftm { - for _, resp := range resps { - if resp.Error() == errConnExpired { - goto retry - } - } + if resps[0].Error() == errConnExpired { + goto retry } if c.retry { for i, resp := range resps { @@ -157,7 +148,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura attempts := 1 retry: resp = c.conn.DoCache(ctx, cmd, ttl) - if c.hasConnLftm && resp.Error() == errConnExpired { + if resp.Error() == errConnExpired { goto retry } if c.retry && c.isRetryable(resp.Error(), ctx) { @@ -177,7 +168,7 @@ func (c *singleClient) Receive(ctx context.Context, subscribe Completed, fn func attempts := 1 retry: err = c.conn.Receive(ctx, subscribe, fn) - if c.hasConnLftm && err == errConnExpired { + if err == errConnExpired { goto retry } if c.retry { diff --git a/cluster.go b/cluster.go index e20502b3..4cf04e88 100644 --- a/cluster.go +++ b/cluster.go @@ -1196,7 +1196,7 @@ func (c *clusterClient) Nodes() map[string]Client { disableCache := c.opt != nil && c.opt.DisableCache for addr, cc := range c.conns { if !cc.hidden { - _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, false, c.retryHandler) + _nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler) } } c.mu.RUnlock() diff --git a/sentinel.go b/sentinel.go index 9792b911..4518b187 100644 --- a/sentinel.go +++ b/sentinel.go @@ -215,7 +215,7 @@ func (c *sentinelClient) Dedicate() (DedicatedClient, func()) { func (c *sentinelClient) Nodes() map[string]Client { conn := c.mConn.Load().(conn) disableCache := c.mOpt != nil && c.mOpt.DisableCache - return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache, false, c.retryHandler)} + return map[string]Client{conn.Addr(): newSingleClientWithConn(conn, c.cmd, c.retry, disableCache, c.retryHandler)} } func (c *sentinelClient) Close() {