1 package redis_test
2
3 import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "net"
9 "slices"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 . "github.com/bsm/ginkgo/v2"
17 . "github.com/bsm/gomega"
18
19 "github.com/redis/go-redis/v9"
20 "github.com/redis/go-redis/v9/internal/hashtag"
21 )
22
23 type clusterScenario struct {
24 ports []string
25 nodeIDs []string
26 clients map[string]*redis.Client
27 }
28
29 func (s *clusterScenario) slots() []int {
30 return []int{0, 5461, 10923, 16384}
31 }
32
33 func (s *clusterScenario) masters() []*redis.Client {
34 result := make([]*redis.Client, 3)
35 for pos, port := range s.ports[:3] {
36 result[pos] = s.clients[port]
37 }
38 return result
39 }
40
41 func (s *clusterScenario) slaves() []*redis.Client {
42 result := make([]*redis.Client, 3)
43 for pos, port := range s.ports[3:] {
44 result[pos] = s.clients[port]
45 }
46 return result
47 }
48
49 func (s *clusterScenario) addrs() []string {
50 addrs := make([]string, len(s.ports))
51 for i, port := range s.ports {
52 addrs[i] = net.JoinHostPort("127.0.0.1", port)
53 }
54 return addrs
55 }
56
57 func (s *clusterScenario) newClusterClientUnstable(opt *redis.ClusterOptions) *redis.ClusterClient {
58 opt.Addrs = s.addrs()
59 return redis.NewClusterClient(opt)
60 }
61
62 func (s *clusterScenario) newClusterClient(
63 ctx context.Context, opt *redis.ClusterOptions,
64 ) *redis.ClusterClient {
65 client := s.newClusterClientUnstable(opt)
66
67 err := eventually(func() error {
68 if opt.ClusterSlots != nil {
69 return nil
70 }
71
72 state, err := client.LoadState(ctx)
73 if err != nil {
74 return err
75 }
76
77 if !state.IsConsistent(ctx) {
78 return fmt.Errorf("cluster state is not consistent")
79 }
80
81 return nil
82 }, 30*time.Second)
83 if err != nil {
84 panic(err)
85 }
86
87 return client
88 }
89
90 func (s *clusterScenario) Close() error {
91 ctx := context.TODO()
92 for _, master := range s.masters() {
93 if master == nil {
94 continue
95 }
96 err := master.FlushAll(ctx).Err()
97 if err != nil {
98 return err
99 }
100
101
102
103 for _, nID := range s.nodeIDs {
104 master.ClusterForget(ctx, nID)
105 }
106 }
107
108 return nil
109 }
110
111 func configureClusterTopology(ctx context.Context, scenario *clusterScenario) error {
112 allowErrs := []string{
113 "ERR Slot 0 is already busy",
114 "ERR Slot 5461 is already busy",
115 "ERR Slot 10923 is already busy",
116 "ERR Slot 16384 is already busy",
117 }
118
119 err := collectNodeInformation(ctx, scenario)
120 if err != nil {
121 return err
122 }
123
124
125 for _, client := range scenario.clients {
126 err := client.ClusterMeet(ctx, "127.0.0.1", scenario.ports[0]).Err()
127 if err != nil {
128 return err
129 }
130 }
131
132 slots := scenario.slots()
133 for pos, master := range scenario.masters() {
134 err := master.ClusterAddSlotsRange(ctx, slots[pos], slots[pos+1]-1).Err()
135 if err != nil && slices.Contains(allowErrs, err.Error()) == false {
136 return err
137 }
138 }
139
140
141 for idx, slave := range scenario.slaves() {
142 masterID := scenario.nodeIDs[idx]
143
144
145 err := eventually(func() error {
146 s := slave.ClusterNodes(ctx).Val()
147 wanted := masterID
148 if !strings.Contains(s, wanted) {
149 return fmt.Errorf("%q does not contain %q", s, wanted)
150 }
151 return nil
152 }, 10*time.Second)
153 if err != nil {
154 return err
155 }
156
157 err = slave.ClusterReplicate(ctx, masterID).Err()
158 if err != nil {
159 return err
160 }
161 }
162
163
164 wanted := []redis.ClusterSlot{{
165 Start: 0,
166 End: 5460,
167 Nodes: []redis.ClusterNode{{
168 ID: "",
169 Addr: "127.0.0.1:16600",
170 }, {
171 ID: "",
172 Addr: "127.0.0.1:16603",
173 }},
174 }, {
175 Start: 5461,
176 End: 10922,
177 Nodes: []redis.ClusterNode{{
178 ID: "",
179 Addr: "127.0.0.1:16601",
180 }, {
181 ID: "",
182 Addr: "127.0.0.1:16604",
183 }},
184 }, {
185 Start: 10923,
186 End: 16383,
187 Nodes: []redis.ClusterNode{{
188 ID: "",
189 Addr: "127.0.0.1:16602",
190 }, {
191 ID: "",
192 Addr: "127.0.0.1:16605",
193 }},
194 }}
195
196 for _, client := range scenario.clients {
197 err := eventually(func() error {
198 res, err := client.ClusterSlots(ctx).Result()
199 if err != nil {
200 return err
201 }
202 return assertSlotsEqual(res, wanted)
203 }, 90*time.Second)
204 if err != nil {
205 return err
206 }
207 }
208
209 return nil
210 }
211
212 func collectNodeInformation(ctx context.Context, scenario *clusterScenario) error {
213 for pos, port := range scenario.ports {
214 client := redis.NewClient(&redis.Options{
215 Addr: ":" + port,
216 })
217
218 myID, err := client.ClusterMyID(ctx).Result()
219 if err != nil {
220 return err
221 }
222
223 scenario.clients[port] = client
224 scenario.nodeIDs[pos] = myID
225 }
226 return nil
227 }
228
229 func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
230 outerLoop:
231 for _, s2 := range wanted {
232 for _, s1 := range slots {
233 if slotEqual(s1, s2) {
234 continue outerLoop
235 }
236 }
237 return fmt.Errorf("%v not found in %v", s2, slots)
238 }
239 return nil
240 }
241
242 func slotEqual(s1, s2 redis.ClusterSlot) bool {
243 if s1.Start != s2.Start {
244 return false
245 }
246 if s1.End != s2.End {
247 return false
248 }
249 if len(s1.Nodes) != len(s2.Nodes) {
250 return false
251 }
252 for i, n1 := range s1.Nodes {
253 if n1.Addr != s2.Nodes[i].Addr {
254 return false
255 }
256 }
257 return true
258 }
259
260
261
262 var _ = Describe("ClusterClient", func() {
263 var failover bool
264 var opt *redis.ClusterOptions
265 var client *redis.ClusterClient
266
267 assertClusterClient := func() {
268 It("do", func() {
269 val, err := client.Do(ctx, "ping").Result()
270 Expect(err).NotTo(HaveOccurred())
271 Expect(val).To(Equal("PONG"))
272 })
273
274 It("should GET/SET/DEL", func() {
275 err := client.Get(ctx, "A").Err()
276 Expect(err).To(Equal(redis.Nil))
277
278 err = client.Set(ctx, "A", "VALUE", 0).Err()
279 Expect(err).NotTo(HaveOccurred())
280
281 Eventually(func() string {
282 return client.Get(ctx, "A").Val()
283 }, 30*time.Second).Should(Equal("VALUE"))
284
285 cnt, err := client.Del(ctx, "A").Result()
286 Expect(err).NotTo(HaveOccurred())
287 Expect(cnt).To(Equal(int64(1)))
288 })
289
290 It("GET follows redirects", func() {
291 err := client.Set(ctx, "A", "VALUE", 0).Err()
292 Expect(err).NotTo(HaveOccurred())
293
294 if !failover {
295 Eventually(func() int64 {
296 nodes, err := client.Nodes(ctx, "A")
297 if err != nil {
298 return 0
299 }
300 return nodes[1].Client.DBSize(ctx).Val()
301 }, 30*time.Second).Should(Equal(int64(1)))
302
303 Eventually(func() error {
304 return client.SwapNodes(ctx, "A")
305 }, 30*time.Second).ShouldNot(HaveOccurred())
306 }
307
308 v, err := client.Get(ctx, "A").Result()
309 Expect(err).NotTo(HaveOccurred())
310 Expect(v).To(Equal("VALUE"))
311 })
312
313 It("SET follows redirects", func() {
314 if !failover {
315 Eventually(func() error {
316 return client.SwapNodes(ctx, "A")
317 }, 30*time.Second).ShouldNot(HaveOccurred())
318 }
319
320 err := client.Set(ctx, "A", "VALUE", 0).Err()
321 Expect(err).NotTo(HaveOccurred())
322
323 v, err := client.Get(ctx, "A").Result()
324 Expect(err).NotTo(HaveOccurred())
325 Expect(v).To(Equal("VALUE"))
326 })
327
328 It("distributes keys", func() {
329 for i := 0; i < 100; i++ {
330 err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
331 Expect(err).NotTo(HaveOccurred())
332 }
333
334 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
335 defer GinkgoRecover()
336 Eventually(func() string {
337 return master.Info(ctx, "keyspace").Val()
338 }, 30*time.Second).Should(Or(
339 ContainSubstring("keys=32"),
340 ContainSubstring("keys=36"),
341 ContainSubstring("keys=32"),
342 ))
343 return nil
344 })
345
346 Expect(err).NotTo(HaveOccurred())
347 })
348
349 It("distributes keys when using EVAL", func() {
350 script := redis.NewScript(`
351 local r = redis.call('SET', KEYS[1], ARGV[1])
352 return r
353 `)
354
355 var key string
356 for i := 0; i < 100; i++ {
357 key = fmt.Sprintf("key%d", i)
358 err := script.Run(ctx, client, []string{key}, "value").Err()
359 Expect(err).NotTo(HaveOccurred())
360 }
361
362 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
363 defer GinkgoRecover()
364 Eventually(func() string {
365 return master.Info(ctx, "keyspace").Val()
366 }, 30*time.Second).Should(Or(
367 ContainSubstring("keys=32"),
368 ContainSubstring("keys=36"),
369 ContainSubstring("keys=32"),
370 ))
371 return nil
372 })
373
374 Expect(err).NotTo(HaveOccurred())
375 })
376
377 It("distributes scripts when using Script Load", func() {
378 client.ScriptFlush(ctx)
379
380 script := redis.NewScript(`return 'Unique script'`)
381
382 script.Load(ctx, client)
383
384 err := client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
385 defer GinkgoRecover()
386
387 val, _ := script.Exists(ctx, shard).Result()
388 Expect(val[0]).To(Equal(true))
389 return nil
390 })
391 Expect(err).NotTo(HaveOccurred())
392 })
393
394 It("checks all shards when using Script Exists", func() {
395 client.ScriptFlush(ctx)
396
397 script := redis.NewScript(`return 'First script'`)
398 lostScriptSrc := `return 'Lost script'`
399 lostScript := redis.NewScript(lostScriptSrc)
400
401 script.Load(ctx, client)
402 client.Do(ctx, "script", "load", lostScriptSrc)
403
404 val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()
405
406 Expect(val).To(Equal([]bool{true, false}))
407 })
408
409 It("flushes scripts from all shards when using ScriptFlush", func() {
410 script := redis.NewScript(`return 'Unnecessary script'`)
411 script.Load(ctx, client)
412
413 val, _ := client.ScriptExists(ctx, script.Hash()).Result()
414 Expect(val).To(Equal([]bool{true}))
415
416 client.ScriptFlush(ctx)
417
418 val, _ = client.ScriptExists(ctx, script.Hash()).Result()
419 Expect(val).To(Equal([]bool{false}))
420 })
421
422 It("supports Watch", func() {
423 var incr func(string) error
424
425
426 incr = func(key string) error {
427 err := client.Watch(ctx, func(tx *redis.Tx) error {
428 n, err := tx.Get(ctx, key).Int64()
429 if err != nil && err != redis.Nil {
430 return err
431 }
432
433 _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
434 pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
435 return nil
436 })
437 return err
438 }, key)
439 if err == redis.TxFailedErr {
440 return incr(key)
441 }
442 return err
443 }
444
445 var wg sync.WaitGroup
446 for i := 0; i < 100; i++ {
447 wg.Add(1)
448 go func() {
449 defer GinkgoRecover()
450 defer wg.Done()
451
452 err := incr("key")
453 Expect(err).NotTo(HaveOccurred())
454 }()
455 }
456 wg.Wait()
457
458 Eventually(func() string {
459 return client.Get(ctx, "key").Val()
460 }, 30*time.Second).Should(Equal("100"))
461 })
462
463 Describe("pipelining", func() {
464 var pipe *redis.Pipeline
465
466 assertPipeline := func(keys []string) {
467
468 It("follows redirects", func() {
469 if !failover {
470 for _, key := range keys {
471 Eventually(func() error {
472 return client.SwapNodes(ctx, key)
473 }, 30*time.Second).ShouldNot(HaveOccurred())
474 }
475 }
476
477 for i, key := range keys {
478 pipe.Set(ctx, key, key+"_value", 0)
479 pipe.Expire(ctx, key, time.Duration(i+1)*time.Hour)
480 }
481 cmds, err := pipe.Exec(ctx)
482 Expect(err).NotTo(HaveOccurred())
483 Expect(cmds).To(HaveLen(14))
484
485
486 for _, key := range keys {
487 Eventually(func() string {
488 return client.Get(ctx, key).Val()
489 }, 30*time.Second).Should(Equal(key + "_value"))
490 }
491
492 if !failover {
493 for _, key := range keys {
494 Eventually(func() error {
495 return client.SwapNodes(ctx, key)
496 }, 30*time.Second).ShouldNot(HaveOccurred())
497 }
498 }
499
500 for _, key := range keys {
501 pipe.Get(ctx, key)
502 pipe.TTL(ctx, key)
503 }
504 cmds, err = pipe.Exec(ctx)
505 Expect(err).NotTo(HaveOccurred())
506 Expect(cmds).To(HaveLen(14))
507
508 for i, key := range keys {
509 get := cmds[i*2].(*redis.StringCmd)
510 Expect(get.Val()).To(Equal(key + "_value"))
511
512 ttl := cmds[(i*2)+1].(*redis.DurationCmd)
513 dur := time.Duration(i+1) * time.Hour
514 Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
515 }
516 })
517
518 It("works with missing keys", func() {
519 pipe.Set(ctx, "A{s}", "A_value", 0)
520 pipe.Set(ctx, "C{s}", "C_value", 0)
521 _, err := pipe.Exec(ctx)
522 Expect(err).NotTo(HaveOccurred())
523
524 a := pipe.Get(ctx, "A{s}")
525 b := pipe.Get(ctx, "B{s}")
526 c := pipe.Get(ctx, "C{s}")
527 cmds, err := pipe.Exec(ctx)
528 Expect(err).To(Equal(redis.Nil))
529 Expect(cmds).To(HaveLen(3))
530
531 Expect(a.Err()).NotTo(HaveOccurred())
532 Expect(a.Val()).To(Equal("A_value"))
533
534 Expect(b.Err()).To(Equal(redis.Nil))
535 Expect(b.Val()).To(Equal(""))
536
537 Expect(c.Err()).NotTo(HaveOccurred())
538 Expect(c.Val()).To(Equal("C_value"))
539 })
540 }
541
542 Describe("with Pipeline", func() {
543 BeforeEach(func() {
544 pipe = client.Pipeline().(*redis.Pipeline)
545 })
546
547 AfterEach(func() {})
548
549 keys := []string{"A", "B", "C", "D", "E", "F", "G"}
550 assertPipeline(keys)
551
552 It("doesn't fail node with context.Canceled error", func() {
553 ctx, cancel := context.WithCancel(context.Background())
554 cancel()
555 pipe.Set(ctx, "A", "A_value", 0)
556 _, err := pipe.Exec(ctx)
557
558 Expect(err).To(HaveOccurred())
559 Expect(errors.Is(err, context.Canceled)).To(BeTrue())
560
561 clientNodes, _ := client.Nodes(ctx, "A")
562
563 for _, node := range clientNodes {
564 Expect(node.Failing()).To(BeFalse())
565 }
566 })
567
568 It("doesn't fail node with context.DeadlineExceeded error", func() {
569 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
570 defer cancel()
571
572 pipe.Set(ctx, "A", "A_value", 0)
573 _, err := pipe.Exec(ctx)
574
575 Expect(err).To(HaveOccurred())
576 Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
577
578 clientNodes, _ := client.Nodes(ctx, "A")
579
580 for _, node := range clientNodes {
581 Expect(node.Failing()).To(BeFalse())
582 }
583 })
584 })
585
586 Describe("with TxPipeline", func() {
587 BeforeEach(func() {
588 pipe = client.TxPipeline().(*redis.Pipeline)
589 })
590
591 AfterEach(func() {})
592
593
594
595 keys := []string{"A{s}", "B{s}", "C{s}", "D{s}", "E{s}", "F{s}", "G{s}"}
596 assertPipeline(keys)
597
598
599 It("returns CrossSlot error", func() {
600 pipe.Set(ctx, "A{s}", "A_value", 0)
601 pipe.Set(ctx, "B{t}", "B_value", 0)
602 Expect(hashtag.Slot("A{s}")).NotTo(Equal(hashtag.Slot("B{t}")))
603 _, err := pipe.Exec(ctx)
604 Expect(err).To(MatchError(redis.ErrCrossSlot))
605 })
606
607 It("works normally with keyless commands and no CrossSlot error", func() {
608 pipe.Set(ctx, "A{s}", "A_value", 0)
609 pipe.Ping(ctx)
610 pipe.Set(ctx, "B{s}", "B_value", 0)
611 pipe.Ping(ctx)
612 _, err := pipe.Exec(ctx)
613 Expect(err).To(Not(HaveOccurred()))
614 })
615
616
617 It("returns no error when there are no commands", func() {
618 _, err := pipe.Exec(ctx)
619 Expect(err).NotTo(HaveOccurred())
620 })
621 })
622 })
623
624 It("supports PubSub", func() {
625 pubsub := client.Subscribe(ctx, "mychannel")
626 defer pubsub.Close()
627
628 Eventually(func() error {
629 _, err := client.Publish(ctx, "mychannel", "hello").Result()
630 if err != nil {
631 return err
632 }
633
634 msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
635 if err != nil {
636 return err
637 }
638
639 _, ok := msg.(*redis.Message)
640 if !ok {
641 return fmt.Errorf("got %T, wanted *redis.Message", msg)
642 }
643
644 return nil
645 }, 30*time.Second).ShouldNot(HaveOccurred())
646 })
647
648 It("supports PubSub with ReadOnly option", func() {
649 opt = redisClusterOptions()
650 opt.ReadOnly = true
651 client = cluster.newClusterClient(ctx, opt)
652
653 pubsub := client.Subscribe(ctx, "mychannel")
654 defer pubsub.Close()
655
656 Eventually(func() error {
657 var masterPubsubChannels atomic.Int64
658 var slavePubsubChannels atomic.Int64
659
660 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
661 info := master.InfoMap(ctx, "stats")
662 if info.Err() != nil {
663 return info.Err()
664 }
665
666 pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
667 if err != nil {
668 return err
669 }
670
671 masterPubsubChannels.Add(int64(pc))
672
673 return nil
674 })
675 if err != nil {
676 return err
677 }
678
679 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
680 info := slave.InfoMap(ctx, "stats")
681 if info.Err() != nil {
682 return info.Err()
683 }
684
685 pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
686 if err != nil {
687 return err
688 }
689
690 slavePubsubChannels.Add(int64(pc))
691
692 return nil
693 })
694 if err != nil {
695 return err
696 }
697
698 if c := masterPubsubChannels.Load(); c != int64(0) {
699 return fmt.Errorf("total master pubsub_channels is %d; expected 0", c)
700 }
701
702 if c := slavePubsubChannels.Load(); c != int64(1) {
703 return fmt.Errorf("total slave pubsub_channels is %d; expected 1", c)
704 }
705
706 return nil
707 }, 30*time.Second).ShouldNot(HaveOccurred())
708
709 Eventually(func() error {
710 _, err := client.Publish(ctx, "mychannel", "hello").Result()
711 if err != nil {
712 return err
713 }
714
715 msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
716 if err != nil {
717 return err
718 }
719
720 _, ok := msg.(*redis.Message)
721 if !ok {
722 return fmt.Errorf("got %T, wanted *redis.Message", msg)
723 }
724
725 return nil
726 }, 30*time.Second).ShouldNot(HaveOccurred())
727 })
728
729 It("supports sharded PubSub", func() {
730 pubsub := client.SSubscribe(ctx, "mychannel")
731 defer pubsub.Close()
732
733 Eventually(func() error {
734 _, err := client.SPublish(ctx, "mychannel", "hello").Result()
735 if err != nil {
736 return err
737 }
738
739 msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
740 if err != nil {
741 return err
742 }
743
744 _, ok := msg.(*redis.Message)
745 if !ok {
746 return fmt.Errorf("got %T, wanted *redis.Message", msg)
747 }
748
749 return nil
750 }, 30*time.Second).ShouldNot(HaveOccurred())
751 })
752
753 It("supports sharded PubSub with ReadOnly option", func() {
754 opt = redisClusterOptions()
755 opt.ReadOnly = true
756 client = cluster.newClusterClient(ctx, opt)
757
758 pubsub := client.SSubscribe(ctx, "mychannel")
759 defer pubsub.Close()
760
761 Eventually(func() error {
762 var masterPubsubShardChannels atomic.Int64
763 var slavePubsubShardChannels atomic.Int64
764
765 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
766 info := master.InfoMap(ctx, "stats")
767 if info.Err() != nil {
768 return info.Err()
769 }
770
771 pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
772 if err != nil {
773 return err
774 }
775
776 masterPubsubShardChannels.Add(int64(pc))
777
778 return nil
779 })
780 if err != nil {
781 return err
782 }
783
784 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
785 info := slave.InfoMap(ctx, "stats")
786 if info.Err() != nil {
787 return info.Err()
788 }
789
790 pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
791 if err != nil {
792 return err
793 }
794
795 slavePubsubShardChannels.Add(int64(pc))
796
797 return nil
798 })
799 if err != nil {
800 return err
801 }
802
803 if c := masterPubsubShardChannels.Load(); c != int64(0) {
804 return fmt.Errorf("total master pubsubshard_channels is %d; expected 0", c)
805 }
806
807 if c := slavePubsubShardChannels.Load(); c != int64(1) {
808 return fmt.Errorf("total slave pubsubshard_channels is %d; expected 1", c)
809 }
810
811 return nil
812 }, 30*time.Second).ShouldNot(HaveOccurred())
813
814 Eventually(func() error {
815 _, err := client.SPublish(ctx, "mychannel", "hello").Result()
816 if err != nil {
817 return err
818 }
819
820 msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
821 if err != nil {
822 return err
823 }
824
825 _, ok := msg.(*redis.Message)
826 if !ok {
827 return fmt.Errorf("got %T, wanted *redis.Message", msg)
828 }
829
830 return nil
831 }, 30*time.Second).ShouldNot(HaveOccurred())
832 })
833
834 It("supports PubSub.Ping without channels", func() {
835 pubsub := client.Subscribe(ctx)
836 defer pubsub.Close()
837
838 err := pubsub.Ping(ctx)
839 Expect(err).NotTo(HaveOccurred())
840 })
841 }
842
843 Describe("ClusterClient PROTO 2", func() {
844 BeforeEach(func() {
845 opt = redisClusterOptions()
846 opt.Protocol = 2
847 client = cluster.newClusterClient(ctx, opt)
848
849 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
850 return master.FlushDB(ctx).Err()
851 })
852 Expect(err).NotTo(HaveOccurred())
853 })
854
855 AfterEach(func() {
856 _ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
857 return master.FlushDB(ctx).Err()
858 })
859 Expect(client.Close()).NotTo(HaveOccurred())
860 })
861
862 It("should CLUSTER PROTO 2", func() {
863 _ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
864 val, err := c.Do(ctx, "HELLO").Result()
865 Expect(err).NotTo(HaveOccurred())
866 Expect(val).Should(ContainElements("proto", int64(2)))
867 return nil
868 })
869 })
870 })
871
872 Describe("ClusterClient", func() {
873 BeforeEach(func() {
874 opt = redisClusterOptions()
875 opt.ClientName = "cluster_hi"
876 client = cluster.newClusterClient(ctx, opt)
877
878 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
879 return master.FlushDB(ctx).Err()
880 })
881 Expect(err).NotTo(HaveOccurred())
882 })
883
884 AfterEach(func() {
885 _ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
886 return master.FlushDB(ctx).Err()
887 })
888 Expect(client.Close()).NotTo(HaveOccurred())
889 })
890
891 It("returns pool stats", func() {
892 stats := client.PoolStats()
893 Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
894 })
895
896 It("returns an error when there are no attempts left", func() {
897 opt := redisClusterOptions()
898 opt.MaxRedirects = -1
899 client := cluster.newClusterClient(ctx, opt)
900
901 Eventually(func() error {
902 return client.SwapNodes(ctx, "A")
903 }, 30*time.Second).ShouldNot(HaveOccurred())
904
905 err := client.Get(ctx, "A").Err()
906 Expect(err).To(HaveOccurred())
907 Expect(err.Error()).To(ContainSubstring("MOVED"))
908
909 Expect(client.Close()).NotTo(HaveOccurred())
910 })
911
912 It("determines hash slots correctly for generic commands", func() {
913 opt := redisClusterOptions()
914 opt.MaxRedirects = -1
915 client := cluster.newClusterClient(ctx, opt)
916
917 err := client.Do(ctx, "GET", "A").Err()
918 Expect(err).To(Equal(redis.Nil))
919
920 err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
921 Expect(err).To(Equal(redis.Nil))
922
923 Eventually(func() error {
924 return client.SwapNodes(ctx, "A")
925 }, 30*time.Second).ShouldNot(HaveOccurred())
926
927 err = client.Do(ctx, "GET", "A").Err()
928 Expect(err).To(HaveOccurred())
929 Expect(err.Error()).To(ContainSubstring("MOVED"))
930
931 err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
932 Expect(err).To(HaveOccurred())
933 Expect(err.Error()).To(ContainSubstring("MOVED"))
934
935 Expect(client.Close()).NotTo(HaveOccurred())
936 })
937
938 It("follows node redirection immediately", func() {
939
940 opt := redisClusterOptions()
941 opt.MinRetryBackoff = 10 * time.Minute
942 opt.MaxRetryBackoff = 20 * time.Minute
943 client := cluster.newClusterClient(ctx, opt)
944
945 Eventually(func() error {
946 return client.SwapNodes(ctx, "A")
947 }, 30*time.Second).ShouldNot(HaveOccurred())
948
949
950
951 redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
952 defer cancel()
953
954 err := client.Set(redirCtx, "A", "VALUE", 0).Err()
955 Expect(err).NotTo(HaveOccurred())
956
957 v, err := client.Get(redirCtx, "A").Result()
958 Expect(err).NotTo(HaveOccurred())
959 Expect(v).To(Equal("VALUE"))
960
961 Expect(client.Close()).NotTo(HaveOccurred())
962 })
963
964 It("calls fn for every master node", func() {
965 for i := 0; i < 10; i++ {
966 Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
967 }
968
969 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
970 return master.FlushDB(ctx).Err()
971 })
972 Expect(err).NotTo(HaveOccurred())
973
974 size, err := client.DBSize(ctx).Result()
975 Expect(err).NotTo(HaveOccurred())
976 Expect(size).To(Equal(int64(0)))
977 })
978
979 It("should CLUSTER SLOTS", func() {
980 res, err := client.ClusterSlots(ctx).Result()
981 Expect(err).NotTo(HaveOccurred())
982 Expect(res).To(HaveLen(3))
983
984 wanted := []redis.ClusterSlot{{
985 Start: 0,
986 End: 5460,
987 Nodes: []redis.ClusterNode{{
988 ID: "",
989 Addr: "127.0.0.1:16600",
990 }, {
991 ID: "",
992 Addr: "127.0.0.1:16603",
993 }},
994 }, {
995 Start: 5461,
996 End: 10922,
997 Nodes: []redis.ClusterNode{{
998 ID: "",
999 Addr: "127.0.0.1:16601",
1000 }, {
1001 ID: "",
1002 Addr: "127.0.0.1:16604",
1003 }},
1004 }, {
1005 Start: 10923,
1006 End: 16383,
1007 Nodes: []redis.ClusterNode{{
1008 ID: "",
1009 Addr: "127.0.0.1:16602",
1010 }, {
1011 ID: "",
1012 Addr: "127.0.0.1:16605",
1013 }},
1014 }}
1015 Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
1016 })
1017
1018 It("should CLUSTER SHARDS", func() {
1019 res, err := client.ClusterShards(ctx).Result()
1020 Expect(err).NotTo(HaveOccurred())
1021 Expect(res).NotTo(BeEmpty())
1022
1023
1024 for _, shard := range res {
1025 Expect(shard.Slots).NotTo(BeEmpty())
1026 for _, slotRange := range shard.Slots {
1027 Expect(slotRange.Start).To(BeNumerically(">=", 0))
1028 Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start))
1029 }
1030
1031 Expect(shard.Nodes).NotTo(BeEmpty())
1032 for _, node := range shard.Nodes {
1033 Expect(node.ID).NotTo(BeEmpty())
1034 Expect(node.Endpoint).NotTo(BeEmpty())
1035 Expect(node.IP).NotTo(BeEmpty())
1036 Expect(node.Port).To(BeNumerically(">", 0))
1037
1038 validRoles := []string{"master", "slave", "replica"}
1039 Expect(validRoles).To(ContainElement(node.Role))
1040
1041 Expect(node.ReplicationOffset).To(BeNumerically(">=", 0))
1042
1043 validHealthStatuses := []string{"online", "failed", "loading"}
1044 Expect(validHealthStatuses).To(ContainElement(node.Health))
1045 }
1046 }
1047 })
1048
1049 It("should CLUSTER LINKS", func() {
1050 res, err := client.ClusterLinks(ctx).Result()
1051 Expect(err).NotTo(HaveOccurred())
1052 Expect(res).NotTo(BeEmpty())
1053
1054
1055 for _, link := range res {
1056
1057 Expect(link.Direction).NotTo(BeEmpty())
1058 Expect([]string{"from", "to"}).To(ContainElement(link.Direction))
1059 Expect(link.Node).NotTo(BeEmpty())
1060 Expect(link.CreateTime).To(BeNumerically(">", 0))
1061
1062 Expect(link.Events).NotTo(BeEmpty())
1063 validEventChars := []rune{'r', 'w'}
1064 for _, eventChar := range link.Events {
1065 Expect(validEventChars).To(ContainElement(eventChar))
1066 }
1067
1068 Expect(link.SendBufferAllocated).To(BeNumerically(">=", 0))
1069 Expect(link.SendBufferUsed).To(BeNumerically(">=", 0))
1070 }
1071 })
1072
1073 It("should cluster client setname", func() {
1074 err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
1075 return c.Ping(ctx).Err()
1076 })
1077 Expect(err).NotTo(HaveOccurred())
1078
1079 _ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
1080 val, err := c.ClientList(ctx).Result()
1081 Expect(err).NotTo(HaveOccurred())
1082 Expect(val).Should(ContainSubstring("name=cluster_hi"))
1083 return nil
1084 })
1085 })
1086
1087 It("should CLUSTER PROTO 3", func() {
1088 _ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
1089 val, err := c.Do(ctx, "HELLO").Result()
1090 Expect(err).NotTo(HaveOccurred())
1091 Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
1092 return nil
1093 })
1094 })
1095
1096 It("should CLUSTER MYSHARDID", func() {
1097 shardID, err := client.ClusterMyShardID(ctx).Result()
1098 Expect(err).NotTo(HaveOccurred())
1099 Expect(shardID).ToNot(BeEmpty())
1100 })
1101
1102 It("should CLUSTER NODES", func() {
1103 res, err := client.ClusterNodes(ctx).Result()
1104 Expect(err).NotTo(HaveOccurred())
1105 Expect(len(res)).To(BeNumerically(">", 400))
1106 })
1107
1108 It("should CLUSTER INFO", func() {
1109 res, err := client.ClusterInfo(ctx).Result()
1110 Expect(err).NotTo(HaveOccurred())
1111 Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
1112 })
1113
1114 It("should CLUSTER KEYSLOT", func() {
1115 hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
1116 Expect(err).NotTo(HaveOccurred())
1117 Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
1118 })
1119
1120 It("should CLUSTER GETKEYSINSLOT", func() {
1121 keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
1122 Expect(err).NotTo(HaveOccurred())
1123 Expect(len(keys)).To(Equal(0))
1124 })
1125
1126 It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
1127 n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
1128 Expect(err).NotTo(HaveOccurred())
1129 Expect(n).To(Equal(int64(0)))
1130 })
1131
1132 It("should CLUSTER COUNTKEYSINSLOT", func() {
1133 n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
1134 Expect(err).NotTo(HaveOccurred())
1135 Expect(n).To(Equal(int64(0)))
1136 })
1137
1138 It("should CLUSTER SAVECONFIG", func() {
1139 res, err := client.ClusterSaveConfig(ctx).Result()
1140 Expect(err).NotTo(HaveOccurred())
1141 Expect(res).To(Equal("OK"))
1142 })
1143
1144 It("should CLUSTER SLAVES", func() {
1145 nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
1146 Expect(err).NotTo(HaveOccurred())
1147 Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
1148 Expect(nodesList).Should(HaveLen(1))
1149 })
1150
1151 It("should RANDOMKEY", func() {
1152 const nkeys = 100
1153
1154 for i := 0; i < nkeys; i++ {
1155 err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
1156 Expect(err).NotTo(HaveOccurred())
1157 }
1158
1159 var keys []string
1160 addKey := func(key string) {
1161 for _, k := range keys {
1162 if k == key {
1163 return
1164 }
1165 }
1166 keys = append(keys, key)
1167 }
1168
1169 for i := 0; i < nkeys*10; i++ {
1170 key := client.RandomKey(ctx).Val()
1171 addKey(key)
1172 }
1173
1174 Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
1175 })
1176
1177 It("supports Process hook", func() {
1178 testCtx, cancel := context.WithCancel(ctx)
1179 defer cancel()
1180
1181 err := client.Ping(ctx).Err()
1182 Expect(err).NotTo(HaveOccurred())
1183
1184 err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1185 return node.Ping(ctx).Err()
1186 })
1187 Expect(err).NotTo(HaveOccurred())
1188
1189 var stack []string
1190
1191 clusterHook := &hook{
1192 processHook: func(hook redis.ProcessHook) redis.ProcessHook {
1193 return func(ctx context.Context, cmd redis.Cmder) error {
1194 select {
1195 case <-testCtx.Done():
1196 return hook(ctx, cmd)
1197 default:
1198 }
1199
1200 Expect(cmd.String()).To(Equal("ping: "))
1201 stack = append(stack, "cluster.BeforeProcess")
1202
1203 err := hook(ctx, cmd)
1204
1205 Expect(cmd.String()).To(Equal("ping: PONG"))
1206 stack = append(stack, "cluster.AfterProcess")
1207
1208 return err
1209 }
1210 },
1211 }
1212 client.AddHook(clusterHook)
1213
1214 nodeHook := &hook{
1215 processHook: func(hook redis.ProcessHook) redis.ProcessHook {
1216 return func(ctx context.Context, cmd redis.Cmder) error {
1217 select {
1218 case <-testCtx.Done():
1219 return hook(ctx, cmd)
1220 default:
1221 }
1222
1223 Expect(cmd.String()).To(Equal("ping: "))
1224 stack = append(stack, "shard.BeforeProcess")
1225
1226 err := hook(ctx, cmd)
1227
1228 Expect(cmd.String()).To(Equal("ping: PONG"))
1229 stack = append(stack, "shard.AfterProcess")
1230
1231 return err
1232 }
1233 },
1234 }
1235
1236 _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1237 node.AddHook(nodeHook)
1238 return nil
1239 })
1240
1241 err = client.Ping(ctx).Err()
1242 Expect(err).NotTo(HaveOccurred())
1243 Expect(stack).To(Equal([]string{
1244 "cluster.BeforeProcess",
1245 "shard.BeforeProcess",
1246 "shard.AfterProcess",
1247 "cluster.AfterProcess",
1248 }))
1249 })
1250
1251 It("supports Pipeline hook", func() {
1252 err := client.Ping(ctx).Err()
1253 Expect(err).NotTo(HaveOccurred())
1254
1255 err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1256 return node.Ping(ctx).Err()
1257 })
1258 Expect(err).NotTo(HaveOccurred())
1259
1260 var stack []string
1261
1262 client.AddHook(&hook{
1263 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
1264 return func(ctx context.Context, cmds []redis.Cmder) error {
1265 Expect(cmds).To(HaveLen(1))
1266 Expect(cmds[0].String()).To(Equal("ping: "))
1267 stack = append(stack, "cluster.BeforeProcessPipeline")
1268
1269 err := hook(ctx, cmds)
1270
1271 Expect(cmds).To(HaveLen(1))
1272 Expect(cmds[0].String()).To(Equal("ping: PONG"))
1273 stack = append(stack, "cluster.AfterProcessPipeline")
1274
1275 return err
1276 }
1277 },
1278 })
1279
1280 _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1281 node.AddHook(&hook{
1282 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
1283 return func(ctx context.Context, cmds []redis.Cmder) error {
1284 Expect(cmds).To(HaveLen(1))
1285 Expect(cmds[0].String()).To(Equal("ping: "))
1286 stack = append(stack, "shard.BeforeProcessPipeline")
1287
1288 err := hook(ctx, cmds)
1289
1290 Expect(cmds).To(HaveLen(1))
1291 Expect(cmds[0].String()).To(Equal("ping: PONG"))
1292 stack = append(stack, "shard.AfterProcessPipeline")
1293
1294 return err
1295 }
1296 },
1297 })
1298 return nil
1299 })
1300
1301 _, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
1302 pipe.Ping(ctx)
1303 return nil
1304 })
1305 Expect(err).NotTo(HaveOccurred())
1306 Expect(stack).To(Equal([]string{
1307 "cluster.BeforeProcessPipeline",
1308 "shard.BeforeProcessPipeline",
1309 "shard.AfterProcessPipeline",
1310 "cluster.AfterProcessPipeline",
1311 }))
1312 })
1313
1314 It("supports TxPipeline hook", func() {
1315 err := client.Ping(ctx).Err()
1316 Expect(err).NotTo(HaveOccurred())
1317
1318 err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1319 return node.Ping(ctx).Err()
1320 })
1321 Expect(err).NotTo(HaveOccurred())
1322
1323 var stack []string
1324
1325 client.AddHook(&hook{
1326 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
1327 return func(ctx context.Context, cmds []redis.Cmder) error {
1328 Expect(cmds).To(HaveLen(3))
1329 Expect(cmds[1].String()).To(Equal("ping: "))
1330 stack = append(stack, "cluster.BeforeProcessPipeline")
1331
1332 err := hook(ctx, cmds)
1333
1334 Expect(cmds).To(HaveLen(3))
1335 Expect(cmds[1].String()).To(Equal("ping: PONG"))
1336 stack = append(stack, "cluster.AfterProcessPipeline")
1337
1338 return err
1339 }
1340 },
1341 })
1342
1343 _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
1344 node.AddHook(&hook{
1345 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
1346 return func(ctx context.Context, cmds []redis.Cmder) error {
1347 Expect(cmds).To(HaveLen(3))
1348 Expect(cmds[1].String()).To(Equal("ping: "))
1349 stack = append(stack, "shard.BeforeProcessPipeline")
1350
1351 err := hook(ctx, cmds)
1352
1353 Expect(cmds).To(HaveLen(3))
1354 Expect(cmds[1].String()).To(Equal("ping: PONG"))
1355 stack = append(stack, "shard.AfterProcessPipeline")
1356
1357 return err
1358 }
1359 },
1360 })
1361 return nil
1362 })
1363
1364 _, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
1365 pipe.Ping(ctx)
1366 return nil
1367 })
1368 Expect(err).NotTo(HaveOccurred())
1369 Expect(stack).To(Equal([]string{
1370 "cluster.BeforeProcessPipeline",
1371 "shard.BeforeProcessPipeline",
1372 "shard.AfterProcessPipeline",
1373 "cluster.AfterProcessPipeline",
1374 }))
1375 })
1376
1377 It("should return correct replica for key", func() {
1378 client, err := client.SlaveForKey(ctx, "test")
1379 Expect(err).ToNot(HaveOccurred())
1380 info := client.Info(ctx, "server")
1381 Expect(info.Val()).Should(ContainSubstring("tcp_port:16604"))
1382 })
1383
1384 It("should return correct master for key", func() {
1385 client, err := client.MasterForKey(ctx, "test")
1386 Expect(err).ToNot(HaveOccurred())
1387 info := client.Info(ctx, "server")
1388 Expect(info.Val()).Should(ContainSubstring("tcp_port:16601"))
1389 })
1390
1391 assertClusterClient()
1392 })
1393
1394 Describe("ClusterClient with RouteByLatency", func() {
1395 BeforeEach(func() {
1396 opt = redisClusterOptions()
1397 opt.RouteByLatency = true
1398 client = cluster.newClusterClient(ctx, opt)
1399
1400 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
1401 return master.FlushDB(ctx).Err()
1402 })
1403 Expect(err).NotTo(HaveOccurred())
1404
1405 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1406 Eventually(func() int64 {
1407 return client.DBSize(ctx).Val()
1408 }, 30*time.Second).Should(Equal(int64(0)))
1409 return nil
1410 })
1411 Expect(err).NotTo(HaveOccurred())
1412 })
1413
1414 AfterEach(func() {
1415 err := client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1416 return slave.ReadWrite(ctx).Err()
1417 })
1418 Expect(err).NotTo(HaveOccurred())
1419
1420 err = client.Close()
1421 Expect(err).NotTo(HaveOccurred())
1422 })
1423
1424 assertClusterClient()
1425 })
1426
1427 Describe("ClusterClient with ClusterSlots", func() {
1428 BeforeEach(func() {
1429 failover = true
1430
1431 opt = redisClusterOptions()
1432 opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
1433 slots := []redis.ClusterSlot{{
1434 Start: 0,
1435 End: 5460,
1436 Nodes: []redis.ClusterNode{{
1437 Addr: ":" + ringShard1Port,
1438 }},
1439 }, {
1440 Start: 5461,
1441 End: 10922,
1442 Nodes: []redis.ClusterNode{{
1443 Addr: ":" + ringShard2Port,
1444 }},
1445 }, {
1446 Start: 10923,
1447 End: 16383,
1448 Nodes: []redis.ClusterNode{{
1449 Addr: ":" + ringShard3Port,
1450 }},
1451 }}
1452 return slots, nil
1453 }
1454 client = cluster.newClusterClient(ctx, opt)
1455
1456 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
1457 return master.FlushDB(ctx).Err()
1458 })
1459 Expect(err).NotTo(HaveOccurred())
1460
1461 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1462 Eventually(func() int64 {
1463 return client.DBSize(ctx).Val()
1464 }, 30*time.Second).Should(Equal(int64(0)))
1465 return nil
1466 })
1467 Expect(err).NotTo(HaveOccurred())
1468 })
1469
1470 AfterEach(func() {
1471 failover = false
1472
1473 err := client.Close()
1474 Expect(err).NotTo(HaveOccurred())
1475 })
1476
1477 assertClusterClient()
1478 })
1479
1480 Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
1481 BeforeEach(func() {
1482 failover = true
1483
1484 opt = redisClusterOptions()
1485 opt.RouteRandomly = true
1486 opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
1487 slots := []redis.ClusterSlot{{
1488 Start: 0,
1489 End: 5460,
1490 Nodes: []redis.ClusterNode{{
1491 Addr: ":" + ringShard1Port,
1492 }},
1493 }, {
1494 Start: 5461,
1495 End: 10922,
1496 Nodes: []redis.ClusterNode{{
1497 Addr: ":" + ringShard2Port,
1498 }},
1499 }, {
1500 Start: 10923,
1501 End: 16383,
1502 Nodes: []redis.ClusterNode{{
1503 Addr: ":" + ringShard3Port,
1504 }},
1505 }}
1506 return slots, nil
1507 }
1508 client = cluster.newClusterClient(ctx, opt)
1509
1510 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
1511 return master.FlushDB(ctx).Err()
1512 })
1513 Expect(err).NotTo(HaveOccurred())
1514
1515 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1516 Eventually(func() int64 {
1517 return client.DBSize(ctx).Val()
1518 }, 30*time.Second).Should(Equal(int64(0)))
1519 return nil
1520 })
1521 Expect(err).NotTo(HaveOccurred())
1522 })
1523
1524 AfterEach(func() {
1525 failover = false
1526
1527 err := client.Close()
1528 Expect(err).NotTo(HaveOccurred())
1529 })
1530
1531 assertClusterClient()
1532 })
1533
1534 Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
1535 BeforeEach(func() {
1536 failover = true
1537
1538 opt = redisClusterOptions()
1539 opt.ReadOnly = true
1540 opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
1541 slots := []redis.ClusterSlot{{
1542 Start: 0,
1543 End: 5460,
1544 Nodes: []redis.ClusterNode{{
1545 Addr: ":16600",
1546 }, {
1547 Addr: ":16603",
1548 }},
1549 }, {
1550 Start: 5461,
1551 End: 10922,
1552 Nodes: []redis.ClusterNode{{
1553 Addr: ":16601",
1554 }, {
1555 Addr: ":16604",
1556 }},
1557 }, {
1558 Start: 10923,
1559 End: 16383,
1560 Nodes: []redis.ClusterNode{{
1561 Addr: ":16602",
1562 }, {
1563 Addr: ":16605",
1564 }},
1565 }}
1566 return slots, nil
1567 }
1568 client = cluster.newClusterClient(ctx, opt)
1569
1570 err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
1571 return master.FlushDB(ctx).Err()
1572 })
1573 Expect(err).NotTo(HaveOccurred())
1574
1575 err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1576 Eventually(func() int64 {
1577 return client.DBSize(ctx).Val()
1578 }, 30*time.Second).Should(Equal(int64(0)))
1579 return nil
1580 })
1581 Expect(err).NotTo(HaveOccurred())
1582 })
1583
1584 AfterEach(func() {
1585 failover = false
1586
1587 err := client.Close()
1588 Expect(err).NotTo(HaveOccurred())
1589 })
1590
1591 assertClusterClient()
1592 })
1593 })
1594
1595 var _ = Describe("ClusterClient without nodes", func() {
1596 var client *redis.ClusterClient
1597
1598 BeforeEach(func() {
1599 client = redis.NewClusterClient(&redis.ClusterOptions{})
1600 })
1601
1602 AfterEach(func() {
1603 Expect(client.Close()).NotTo(HaveOccurred())
1604 })
1605
1606 It("Ping returns an error", func() {
1607 err := client.Ping(ctx).Err()
1608 Expect(err).To(MatchError("redis: cluster has no nodes"))
1609 })
1610
1611 It("pipeline returns an error", func() {
1612 _, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
1613 pipe.Ping(ctx)
1614 return nil
1615 })
1616 Expect(err).To(MatchError("redis: cluster has no nodes"))
1617 })
1618 })
1619
1620 var _ = Describe("ClusterClient without valid nodes", func() {
1621 var client *redis.ClusterClient
1622
1623 BeforeEach(func() {
1624 client = redis.NewClusterClient(&redis.ClusterOptions{
1625 Addrs: []string{redisAddr},
1626 })
1627 })
1628
1629 AfterEach(func() {
1630 Expect(client.Close()).NotTo(HaveOccurred())
1631 })
1632
1633 It("returns an error", func() {
1634 err := client.Ping(ctx).Err()
1635 Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
1636 })
1637
1638 It("pipeline returns an error", func() {
1639 _, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
1640 pipe.Ping(ctx)
1641 return nil
1642 })
1643 Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
1644 })
1645 })
1646
1647 var _ = Describe("ClusterClient with unavailable Cluster", func() {
1648 var client *redis.ClusterClient
1649
1650 BeforeEach(func() {
1651 opt := redisClusterOptions()
1652 opt.ReadTimeout = 250 * time.Millisecond
1653 opt.WriteTimeout = 250 * time.Millisecond
1654 opt.MaxRedirects = 1
1655 client = cluster.newClusterClientUnstable(opt)
1656 Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
1657
1658 for _, node := range cluster.clients {
1659 err := node.ClientPause(ctx, 5*time.Second).Err()
1660 Expect(err).NotTo(HaveOccurred())
1661 }
1662 })
1663
1664 AfterEach(func() {
1665 Expect(client.Close()).NotTo(HaveOccurred())
1666 })
1667
1668 It("recovers when Cluster recovers", func() {
1669 err := client.Ping(ctx).Err()
1670 Expect(err).To(HaveOccurred())
1671
1672 Eventually(func() error {
1673 return client.Ping(ctx).Err()
1674 }, "30s").ShouldNot(HaveOccurred())
1675 })
1676 })
1677
1678 var _ = Describe("ClusterClient timeout", func() {
1679 var client *redis.ClusterClient
1680
1681 AfterEach(func() {
1682 _ = client.Close()
1683 })
1684
1685 testTimeout := func() {
1686 It("Ping timeouts", func() {
1687 err := client.Ping(ctx).Err()
1688 Expect(err).To(HaveOccurred())
1689 Expect(err.(net.Error).Timeout()).To(BeTrue())
1690 })
1691
1692 It("Pipeline timeouts", func() {
1693 _, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
1694 pipe.Ping(ctx)
1695 return nil
1696 })
1697 Expect(err).To(HaveOccurred())
1698 Expect(err.(net.Error).Timeout()).To(BeTrue())
1699 })
1700
1701 It("Tx timeouts", func() {
1702 err := client.Watch(ctx, func(tx *redis.Tx) error {
1703 return tx.Ping(ctx).Err()
1704 }, "foo")
1705 Expect(err).To(HaveOccurred())
1706 Expect(err.(net.Error).Timeout()).To(BeTrue())
1707 })
1708
1709 It("Tx Pipeline timeouts", func() {
1710 err := client.Watch(ctx, func(tx *redis.Tx) error {
1711 _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
1712 pipe.Ping(ctx)
1713 return nil
1714 })
1715 return err
1716 }, "foo")
1717 Expect(err).To(HaveOccurred())
1718 Expect(err.(net.Error).Timeout()).To(BeTrue())
1719 })
1720 }
1721
1722 const pause = 5 * time.Second
1723
1724 Context("read/write timeout", func() {
1725 BeforeEach(func() {
1726 opt := redisClusterOptions()
1727 client = cluster.newClusterClient(ctx, opt)
1728
1729 err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
1730 err := client.ClientPause(ctx, pause).Err()
1731
1732 opt := client.Options()
1733 opt.ReadTimeout = time.Nanosecond
1734 opt.WriteTimeout = time.Nanosecond
1735
1736 return err
1737 })
1738 Expect(err).NotTo(HaveOccurred())
1739
1740
1741 opt.ReadTimeout = time.Nanosecond
1742 opt.WriteTimeout = time.Nanosecond
1743 opt.MaxRedirects = 0
1744 })
1745
1746 AfterEach(func() {
1747 _ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
1748 defer GinkgoRecover()
1749
1750 opt := client.Options()
1751 opt.ReadTimeout = time.Second
1752 opt.WriteTimeout = time.Second
1753
1754 Eventually(func() error {
1755 return client.Ping(ctx).Err()
1756 }, 2*pause).ShouldNot(HaveOccurred())
1757 return nil
1758 })
1759
1760 err := client.Close()
1761 Expect(err).NotTo(HaveOccurred())
1762 })
1763
1764 testTimeout()
1765 })
1766 })
1767
1768 var _ = Describe("ClusterClient ParseURL", func() {
1769 cases := []struct {
1770 test string
1771 url string
1772 o *redis.ClusterOptions
1773 err error
1774 }{
1775 {
1776 test: "ParseRedisURL",
1777 url: "redis://localhost:123",
1778 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
1779 }, {
1780 test: "ParseRedissURL",
1781 url: "rediss://localhost:123",
1782 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
1783 }, {
1784 test: "MissingRedisPort",
1785 url: "redis://localhost",
1786 o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
1787 }, {
1788 test: "MissingRedissPort",
1789 url: "rediss://localhost",
1790 o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
1791 }, {
1792 test: "MultipleRedisURLs",
1793 url: "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
1794 o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
1795 }, {
1796 test: "MultipleRedissURLs",
1797 url: "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
1798 o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
1799 }, {
1800 test: "OnlyPassword",
1801 url: "redis://:bar@localhost:123",
1802 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
1803 }, {
1804 test: "OnlyUser",
1805 url: "redis://foo@localhost:123",
1806 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
1807 }, {
1808 test: "RedisUsernamePassword",
1809 url: "redis://foo:bar@localhost:123",
1810 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
1811 }, {
1812 test: "RedissUsernamePassword",
1813 url: "rediss://foo:bar@localhost:123?addr=localhost:1234",
1814 o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
1815 }, {
1816 test: "QueryParameters",
1817 url: "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
1818 o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
1819 }, {
1820 test: "DisabledTimeout",
1821 url: "redis://localhost:123?conn_max_idle_time=0",
1822 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
1823 }, {
1824 test: "DisabledTimeoutNeg",
1825 url: "redis://localhost:123?conn_max_idle_time=-1",
1826 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
1827 }, {
1828 test: "UseDefault",
1829 url: "redis://localhost:123?conn_max_idle_time=",
1830 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
1831 }, {
1832 test: "FailingTimeoutSeconds",
1833 url: "redis://localhost:123?failing_timeout_seconds=25",
1834 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, FailingTimeoutSeconds: 25},
1835 }, {
1836 test: "Protocol",
1837 url: "redis://localhost:123?protocol=2",
1838 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Protocol: 2},
1839 }, {
1840 test: "ClientName",
1841 url: "redis://localhost:123?client_name=cluster_hi",
1842 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
1843 }, {
1844 test: "UseDefaultMissing=",
1845 url: "redis://localhost:123?conn_max_idle_time",
1846 o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
1847 }, {
1848 test: "InvalidQueryAddr",
1849 url: "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
1850 err: errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
1851 }, {
1852 test: "InvalidInt",
1853 url: "redis://localhost?pool_size=five",
1854 err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
1855 }, {
1856 test: "InvalidBool",
1857 url: "redis://localhost?pool_fifo=yes",
1858 err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
1859 }, {
1860 test: "UnknownParam",
1861 url: "redis://localhost?abc=123",
1862 err: errors.New("redis: unexpected option: abc"),
1863 }, {
1864 test: "InvalidScheme",
1865 url: "https://google.com",
1866 err: errors.New("redis: invalid URL scheme: https"),
1867 },
1868 }
1869
1870 It("match ParseClusterURL", func() {
1871 for i := range cases {
1872 tc := cases[i]
1873 actual, err := redis.ParseClusterURL(tc.url)
1874 if tc.err != nil {
1875 Expect(err).Should(MatchError(tc.err))
1876 } else {
1877 Expect(err).NotTo(HaveOccurred())
1878 }
1879
1880 if err == nil {
1881 Expect(tc.o).NotTo(BeNil())
1882
1883 Expect(tc.o.Addrs).To(Equal(actual.Addrs))
1884 Expect(tc.o.TLSConfig).To(Equal(actual.TLSConfig))
1885 Expect(tc.o.Username).To(Equal(actual.Username))
1886 Expect(tc.o.Password).To(Equal(actual.Password))
1887 Expect(tc.o.MaxRetries).To(Equal(actual.MaxRetries))
1888 Expect(tc.o.MinRetryBackoff).To(Equal(actual.MinRetryBackoff))
1889 Expect(tc.o.MaxRetryBackoff).To(Equal(actual.MaxRetryBackoff))
1890 Expect(tc.o.DialTimeout).To(Equal(actual.DialTimeout))
1891 Expect(tc.o.ReadTimeout).To(Equal(actual.ReadTimeout))
1892 Expect(tc.o.WriteTimeout).To(Equal(actual.WriteTimeout))
1893 Expect(tc.o.PoolFIFO).To(Equal(actual.PoolFIFO))
1894 Expect(tc.o.PoolSize).To(Equal(actual.PoolSize))
1895 Expect(tc.o.MinIdleConns).To(Equal(actual.MinIdleConns))
1896 Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
1897 Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
1898 Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
1899 Expect(tc.o.FailingTimeoutSeconds).To(Equal(actual.FailingTimeoutSeconds))
1900 }
1901 }
1902 })
1903 })
1904
1905 var _ = Describe("ClusterClient FailingTimeoutSeconds", func() {
1906 var client *redis.ClusterClient
1907
1908 AfterEach(func() {
1909 if client != nil {
1910 _ = client.Close()
1911 }
1912 })
1913
1914 It("should use default failing timeout of 15 seconds", func() {
1915 opt := redisClusterOptions()
1916 client = cluster.newClusterClient(ctx, opt)
1917
1918
1919 Expect(opt.FailingTimeoutSeconds).To(Equal(15))
1920 })
1921
1922 It("should use custom failing timeout", func() {
1923 opt := redisClusterOptions()
1924 opt.FailingTimeoutSeconds = 30
1925 client = cluster.newClusterClient(ctx, opt)
1926
1927
1928 Expect(opt.FailingTimeoutSeconds).To(Equal(30))
1929 })
1930
1931 It("should parse failing_timeout_seconds from URL", func() {
1932 url := "redis://localhost:16600?failing_timeout_seconds=25"
1933 opt, err := redis.ParseClusterURL(url)
1934 Expect(err).NotTo(HaveOccurred())
1935 Expect(opt.FailingTimeoutSeconds).To(Equal(25))
1936 })
1937
1938 It("should handle node failing timeout correctly", func() {
1939 opt := redisClusterOptions()
1940 opt.FailingTimeoutSeconds = 2
1941 client = cluster.newClusterClient(ctx, opt)
1942
1943
1944 nodes, err := client.Nodes(ctx, "A")
1945 Expect(err).NotTo(HaveOccurred())
1946 Expect(len(nodes)).To(BeNumerically(">", 0))
1947
1948 node := nodes[0]
1949
1950
1951 Expect(node.Failing()).To(BeFalse())
1952
1953
1954 node.MarkAsFailing()
1955 Expect(node.Failing()).To(BeTrue())
1956
1957
1958 time.Sleep(1 * time.Second)
1959 Expect(node.Failing()).To(BeTrue())
1960
1961
1962 time.Sleep(2 * time.Second)
1963 Expect(node.Failing()).To(BeFalse())
1964 })
1965
1966 It("should handle zero timeout by using default", func() {
1967 opt := redisClusterOptions()
1968 opt.FailingTimeoutSeconds = 0
1969 client = cluster.newClusterClient(ctx, opt)
1970
1971
1972 Expect(opt.FailingTimeoutSeconds).To(Equal(15))
1973 })
1974 })
1975
View as plain text