@@ -462,8 +462,7 @@ var _ = Describe("ClusterClient", func() {
462
462
Describe ("pipelining" , func () {
463
463
var pipe * redis.Pipeline
464
464
465
- assertPipeline := func () {
466
- keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
465
+ assertPipeline := func (keys []string ) {
467
466
468
467
It ("follows redirects" , func () {
469
468
if ! failover {
@@ -482,13 +481,12 @@ var _ = Describe("ClusterClient", func() {
482
481
Expect (err ).NotTo (HaveOccurred ())
483
482
Expect (cmds ).To (HaveLen (14 ))
484
483
485
- _ = client .ForEachShard (ctx , func (ctx context.Context , node * redis.Client ) error {
486
- defer GinkgoRecover ()
487
- Eventually (func () int64 {
488
- return node .DBSize (ctx ).Val ()
489
- }, 30 * time .Second ).ShouldNot (BeZero ())
490
- return nil
491
- })
484
+ // Check that all keys are set.
485
+ for _ , key := range keys {
486
+ Eventually (func () string {
487
+ return client .Get (ctx , key ).Val ()
488
+ }, 30 * time .Second ).Should (Equal (key + "_value" ))
489
+ }
492
490
493
491
if ! failover {
494
492
for _ , key := range keys {
@@ -517,14 +515,14 @@ var _ = Describe("ClusterClient", func() {
517
515
})
518
516
519
517
It ("works with missing keys" , func () {
520
- pipe .Set (ctx , "A" , "A_value" , 0 )
521
- pipe .Set (ctx , "C" , "C_value" , 0 )
518
+ pipe .Set (ctx , "A{s} " , "A_value" , 0 )
519
+ pipe .Set (ctx , "C{s} " , "C_value" , 0 )
522
520
_ , err := pipe .Exec (ctx )
523
521
Expect (err ).NotTo (HaveOccurred ())
524
522
525
- a := pipe .Get (ctx , "A" )
526
- b := pipe .Get (ctx , "B" )
527
- c := pipe .Get (ctx , "C" )
523
+ a := pipe .Get (ctx , "A{s} " )
524
+ b := pipe .Get (ctx , "B{s} " )
525
+ c := pipe .Get (ctx , "C{s} " )
528
526
cmds , err := pipe .Exec (ctx )
529
527
Expect (err ).To (Equal (redis .Nil ))
530
528
Expect (cmds ).To (HaveLen (3 ))
@@ -547,7 +545,8 @@ var _ = Describe("ClusterClient", func() {
547
545
548
546
AfterEach (func () {})
549
547
550
- assertPipeline ()
548
+ keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
549
+ assertPipeline (keys )
551
550
552
551
It ("doesn't fail node with context.Canceled error" , func () {
553
552
ctx , cancel := context .WithCancel (context .Background ())
@@ -590,7 +589,25 @@ var _ = Describe("ClusterClient", func() {
590
589
591
590
AfterEach (func () {})
592
591
593
- assertPipeline ()
592
+ // TxPipeline doesn't support cross slot commands.
593
+ // Use hashtag to force all keys to the same slot.
594
+ keys := []string {"A{s}" , "B{s}" , "C{s}" , "D{s}" , "E{s}" , "F{s}" , "G{s}" }
595
+ assertPipeline (keys )
596
+
597
+ // make sure CrossSlot error is returned
598
+ It ("returns CrossSlot error" , func () {
599
+ pipe .Set (ctx , "A{s}" , "A_value" , 0 )
600
+ pipe .Set (ctx , "B{t}" , "B_value" , 0 )
601
+ Expect (hashtag .Slot ("A{s}" )).NotTo (Equal (hashtag .Slot ("B{t}" )))
602
+ _ , err := pipe .Exec (ctx )
603
+ Expect (err ).To (MatchError (redis .ErrCrossSlot ))
604
+ })
605
+
606
+ // doesn't fail when no commands are queued
607
+ It ("returns no error when there are no commands" , func () {
608
+ _ , err := pipe .Exec (ctx )
609
+ Expect (err ).NotTo (HaveOccurred ())
610
+ })
594
611
})
595
612
})
596
613
0 commit comments