1 package redis_test
2
3 import (
4 "context"
5 "crypto/rand"
6 "fmt"
7 "net"
8 "strconv"
9 "sync"
10 "time"
11
12 . "github.com/bsm/ginkgo/v2"
13 . "github.com/bsm/gomega"
14
15 "github.com/redis/go-redis/v9"
16 )
17
18 var _ = Describe("Redis Ring PROTO 2", func() {
19 const heartbeat = 100 * time.Millisecond
20
21 var ring *redis.Ring
22
23 BeforeEach(func() {
24 opt := redisRingOptions()
25 opt.Protocol = 2
26 opt.HeartbeatFrequency = heartbeat
27 ring = redis.NewRing(opt)
28
29 err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
30 return cl.FlushDB(ctx).Err()
31 })
32 Expect(err).NotTo(HaveOccurred())
33 })
34
35 AfterEach(func() {
36 Expect(ring.Close()).NotTo(HaveOccurred())
37 })
38
39 It("should ring PROTO 2", func() {
40 _ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
41 val, err := c.Do(ctx, "HELLO").Result()
42 Expect(err).NotTo(HaveOccurred())
43 Expect(val).Should(ContainElements("proto", int64(2)))
44 return nil
45 })
46 })
47 })
48
49 var _ = Describe("Redis Ring", func() {
50 const heartbeat = 100 * time.Millisecond
51
52 var ring *redis.Ring
53
54 setRingKeys := func() {
55 for i := 0; i < 100; i++ {
56 err := ring.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
57 Expect(err).NotTo(HaveOccurred())
58 }
59 }
60
61 BeforeEach(func() {
62 opt := redisRingOptions()
63 opt.ClientName = "ring_hi"
64 opt.HeartbeatFrequency = heartbeat
65 ring = redis.NewRing(opt)
66
67 err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
68 return cl.FlushDB(ctx).Err()
69 })
70 Expect(err).NotTo(HaveOccurred())
71 })
72
73 AfterEach(func() {
74 Expect(ring.Close()).NotTo(HaveOccurred())
75 })
76
77 It("do", func() {
78 val, err := ring.Do(ctx, "ping").Result()
79 Expect(err).NotTo(HaveOccurred())
80 Expect(val).To(Equal("PONG"))
81 })
82
83 It("supports context", func() {
84 ctx, cancel := context.WithCancel(ctx)
85 cancel()
86
87 err := ring.Ping(ctx).Err()
88 Expect(err).To(MatchError("context canceled"))
89 })
90
91 It("should ring client setname", func() {
92 err := ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
93 return c.Ping(ctx).Err()
94 })
95 Expect(err).NotTo(HaveOccurred())
96
97 _ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
98 val, err := c.ClientList(ctx).Result()
99 Expect(err).NotTo(HaveOccurred())
100 Expect(val).Should(ContainSubstring("name=ring_hi"))
101 return nil
102 })
103 })
104
105 It("should ring PROTO 3", func() {
106 _ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
107 val, err := c.Do(ctx, "HELLO").Result()
108 Expect(err).NotTo(HaveOccurred())
109 Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
110 return nil
111 })
112 })
113
114 It("distributes keys", func() {
115 setRingKeys()
116
117
118 Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
119 Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
120 })
121
122 It("distributes keys when using EVAL", func() {
123 script := redis.NewScript(`
124 local r = redis.call('SET', KEYS[1], ARGV[1])
125 return r
126 `)
127
128 var key string
129 for i := 0; i < 100; i++ {
130 key = fmt.Sprintf("key%d", i)
131 err := script.Run(ctx, ring, []string{key}, "value").Err()
132 Expect(err).NotTo(HaveOccurred())
133 }
134
135 Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
136 Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
137 })
138
139 It("supports hash tags", func() {
140 for i := 0; i < 100; i++ {
141 err := ring.Set(ctx, fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
142 Expect(err).NotTo(HaveOccurred())
143 }
144
145 Expect(ringShard1.Info(ctx, "keyspace").Val()).ToNot(ContainSubstring("keys="))
146 Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
147 })
148
149 Describe("[new] dynamic setting ring shards", func() {
150 It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
151 Expect(ring.Len(), 2)
152
153 wantShard := ring.ShardByName("ringShardOne")
154 ring.SetAddrs(map[string]string{
155 "ringShardOne": ":" + ringShard1Port,
156 })
157 Expect(ring.Len(), 1)
158 gotShard := ring.ShardByName("ringShardOne")
159 Expect(gotShard).To(BeIdenticalTo(wantShard))
160
161 ring.SetAddrs(map[string]string{
162 "ringShardOne": ":" + ringShard1Port,
163 "ringShardTwo": ":" + ringShard2Port,
164 })
165 Expect(ring.Len(), 2)
166 gotShard = ring.ShardByName("ringShardOne")
167 Expect(gotShard).To(BeIdenticalTo(wantShard))
168 })
169
170 It("uses 3 shards after setting it to 3 shards", func() {
171 Expect(ring.Len(), 2)
172
173 shardName1 := "ringShardOne"
174 shardAddr1 := ":" + ringShard1Port
175 wantShard1 := ring.ShardByName(shardName1)
176 shardName2 := "ringShardTwo"
177 shardAddr2 := ":" + ringShard2Port
178 wantShard2 := ring.ShardByName(shardName2)
179 shardName3 := "ringShardThree"
180 shardAddr3 := ":" + ringShard3Port
181
182 ring.SetAddrs(map[string]string{
183 shardName1: shardAddr1,
184 shardName2: shardAddr2,
185 shardName3: shardAddr3,
186 })
187 Expect(ring.Len(), 3)
188 gotShard1 := ring.ShardByName(shardName1)
189 gotShard2 := ring.ShardByName(shardName2)
190 gotShard3 := ring.ShardByName(shardName3)
191 Expect(gotShard1).To(BeIdenticalTo(wantShard1))
192 Expect(gotShard2).To(BeIdenticalTo(wantShard2))
193 Expect(gotShard3).ToNot(BeNil())
194
195 ring.SetAddrs(map[string]string{
196 shardName1: shardAddr1,
197 shardName2: shardAddr2,
198 })
199 Expect(ring.Len(), 2)
200 gotShard1 = ring.ShardByName(shardName1)
201 gotShard2 = ring.ShardByName(shardName2)
202 gotShard3 = ring.ShardByName(shardName3)
203 Expect(gotShard1).To(BeIdenticalTo(wantShard1))
204 Expect(gotShard2).To(BeIdenticalTo(wantShard2))
205 Expect(gotShard3).To(BeNil())
206 })
207 })
208 Describe("pipeline", func() {
209 It("doesn't panic closed ring, returns error", func() {
210 pipe := ring.Pipeline()
211 for i := 0; i < 3; i++ {
212 err := pipe.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
213 Expect(err).NotTo(HaveOccurred())
214 }
215
216 Expect(ring.Close()).NotTo(HaveOccurred())
217
218 Expect(func() {
219 _, execErr := pipe.Exec(ctx)
220 Expect(execErr).To(HaveOccurred())
221 }).NotTo(Panic())
222 })
223
224 It("distributes keys", func() {
225 pipe := ring.Pipeline()
226 for i := 0; i < 100; i++ {
227 err := pipe.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
228 Expect(err).NotTo(HaveOccurred())
229 }
230 cmds, err := pipe.Exec(ctx)
231 Expect(err).NotTo(HaveOccurred())
232 Expect(cmds).To(HaveLen(100))
233
234 for _, cmd := range cmds {
235 Expect(cmd.Err()).NotTo(HaveOccurred())
236 Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
237 }
238
239
240 Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=56"))
241 Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=44"))
242 })
243
244 It("is consistent with ring", func() {
245 var keys []string
246 for i := 0; i < 100; i++ {
247 key := make([]byte, 64)
248 _, err := rand.Read(key)
249 Expect(err).NotTo(HaveOccurred())
250 keys = append(keys, string(key))
251 }
252
253 _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
254 for _, key := range keys {
255 pipe.Set(ctx, key, "value", 0).Err()
256 }
257 return nil
258 })
259 Expect(err).NotTo(HaveOccurred())
260
261 for _, key := range keys {
262 val, err := ring.Get(ctx, key).Result()
263 Expect(err).NotTo(HaveOccurred())
264 Expect(val).To(Equal("value"))
265 }
266 })
267
268 It("supports hash tags", func() {
269 _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
270 for i := 0; i < 100; i++ {
271 pipe.Set(ctx, fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
272 }
273 return nil
274 })
275 Expect(err).NotTo(HaveOccurred())
276
277 Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys="))
278 Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100"))
279 })
280
281 It("return dial timeout error", func() {
282 opt := redisRingOptions()
283 opt.DialTimeout = 250 * time.Millisecond
284 opt.Addrs = map[string]string{"ringShardNotExist": ":1997"}
285 ring = redis.NewRing(opt)
286
287 _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
288 pipe.HSet(ctx, "key", "value")
289 pipe.Expire(ctx, "key", time.Minute)
290 return nil
291 })
292
293 Expect(err).To(HaveOccurred())
294 })
295 })
296
297 Describe("new client callback", func() {
298 It("can be initialized with a new client callback", func() {
299 opts := redisRingOptions()
300 opts.NewClient = func(opt *redis.Options) *redis.Client {
301 opt.Username = "username1"
302 opt.Password = "password1"
303 return redis.NewClient(opt)
304 }
305 ring = redis.NewRing(opts)
306
307 err := ring.Ping(ctx).Err()
308 Expect(err).To(HaveOccurred())
309 Expect(err.Error()).To(ContainSubstring("WRONGPASS"))
310 })
311 })
312
313 Describe("Process hook", func() {
314 BeforeEach(func() {
315
316
317 opt := redisRingOptions()
318 opt.HeartbeatFrequency = 72 * time.Hour
319 ring = redis.NewRing(opt)
320 })
321 It("supports Process hook", func() {
322 err := ring.Set(ctx, "key", "test", 0).Err()
323 Expect(err).NotTo(HaveOccurred())
324
325 var stack []string
326
327 ring.AddHook(&hook{
328 processHook: func(hook redis.ProcessHook) redis.ProcessHook {
329 return func(ctx context.Context, cmd redis.Cmder) error {
330 Expect(cmd.String()).To(Equal("get key: "))
331 stack = append(stack, "ring.BeforeProcess")
332
333 err := hook(ctx, cmd)
334
335 Expect(cmd.String()).To(Equal("get key: test"))
336 stack = append(stack, "ring.AfterProcess")
337
338 return err
339 }
340 },
341 })
342
343 ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
344 shard.AddHook(&hook{
345 processHook: func(hook redis.ProcessHook) redis.ProcessHook {
346 return func(ctx context.Context, cmd redis.Cmder) error {
347 Expect(cmd.String()).To(Equal("get key: "))
348 stack = append(stack, "shard.BeforeProcess")
349
350 err := hook(ctx, cmd)
351
352 Expect(cmd.String()).To(Equal("get key: test"))
353 stack = append(stack, "shard.AfterProcess")
354
355 return err
356 }
357 },
358 })
359 return nil
360 })
361
362 err = ring.Get(ctx, "key").Err()
363 Expect(err).NotTo(HaveOccurred())
364 Expect(stack).To(Equal([]string{
365 "ring.BeforeProcess",
366 "shard.BeforeProcess",
367 "shard.AfterProcess",
368 "ring.AfterProcess",
369 }))
370 })
371
372 It("supports Pipeline hook", func() {
373 err := ring.Ping(ctx).Err()
374 Expect(err).NotTo(HaveOccurred())
375
376 var stack []string
377
378 ring.AddHook(&hook{
379 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
380 return func(ctx context.Context, cmds []redis.Cmder) error {
381
382 if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
383 return nil
384 }
385 Expect(len(cmds)).To(BeNumerically(">", 0))
386 Expect(cmds[0].String()).To(Equal("ping: "))
387 stack = append(stack, "ring.BeforeProcessPipeline")
388
389 err := hook(ctx, cmds)
390
391 Expect(len(cmds)).To(BeNumerically(">", 0))
392 Expect(cmds[0].String()).To(Equal("ping: PONG"))
393 stack = append(stack, "ring.AfterProcessPipeline")
394
395 return err
396 }
397 },
398 })
399
400 ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
401 shard.AddHook(&hook{
402 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
403 return func(ctx context.Context, cmds []redis.Cmder) error {
404
405 if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
406 return nil
407 }
408 Expect(len(cmds)).To(BeNumerically(">", 0))
409 Expect(cmds[0].String()).To(Equal("ping: "))
410 stack = append(stack, "shard.BeforeProcessPipeline")
411
412 err := hook(ctx, cmds)
413
414 Expect(len(cmds)).To(BeNumerically(">", 0))
415 Expect(cmds[0].String()).To(Equal("ping: PONG"))
416 stack = append(stack, "shard.AfterProcessPipeline")
417
418 return err
419 }
420 },
421 })
422 return nil
423 })
424
425 _, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
426 pipe.Ping(ctx)
427 return nil
428 })
429 Expect(err).NotTo(HaveOccurred())
430 Expect(stack).To(Equal([]string{
431 "ring.BeforeProcessPipeline",
432 "shard.BeforeProcessPipeline",
433 "shard.AfterProcessPipeline",
434 "ring.AfterProcessPipeline",
435 }))
436 })
437
438 It("supports TxPipeline hook", func() {
439 err := ring.Ping(ctx).Err()
440 Expect(err).NotTo(HaveOccurred())
441
442 var stack []string
443
444 ring.AddHook(&hook{
445 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
446 return func(ctx context.Context, cmds []redis.Cmder) error {
447 defer GinkgoRecover()
448
449 if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
450 return nil
451 }
452
453 Expect(len(cmds)).To(BeNumerically(">=", 3))
454 Expect(cmds[1].String()).To(Equal("ping: "))
455 stack = append(stack, "ring.BeforeProcessPipeline")
456
457 err := hook(ctx, cmds)
458
459 Expect(len(cmds)).To(BeNumerically(">=", 3))
460 Expect(cmds[1].String()).To(Equal("ping: PONG"))
461 stack = append(stack, "ring.AfterProcessPipeline")
462
463 return err
464 }
465 },
466 })
467
468 ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
469 shard.AddHook(&hook{
470 processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
471 return func(ctx context.Context, cmds []redis.Cmder) error {
472 defer GinkgoRecover()
473
474 if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
475 return nil
476 }
477
478 Expect(len(cmds)).To(BeNumerically(">=", 3))
479 Expect(cmds[1].String()).To(Equal("ping: "))
480 stack = append(stack, "shard.BeforeProcessPipeline")
481
482 err := hook(ctx, cmds)
483
484 Expect(len(cmds)).To(BeNumerically(">=", 3))
485 Expect(cmds[1].String()).To(Equal("ping: PONG"))
486 stack = append(stack, "shard.AfterProcessPipeline")
487
488 return err
489 }
490 },
491 })
492 return nil
493 })
494
495 _, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
496 pipe.Ping(ctx)
497 return nil
498 })
499 Expect(err).NotTo(HaveOccurred())
500 Expect(stack).To(Equal([]string{
501 "ring.BeforeProcessPipeline",
502 "shard.BeforeProcessPipeline",
503 "shard.AfterProcessPipeline",
504 "ring.AfterProcessPipeline",
505 }))
506 })
507 })
508 })
509
510 var _ = Describe("empty Redis Ring", func() {
511 var ring *redis.Ring
512
513 BeforeEach(func() {
514 ring = redis.NewRing(&redis.RingOptions{})
515 })
516
517 AfterEach(func() {
518 Expect(ring.Close()).NotTo(HaveOccurred())
519 })
520
521 It("returns an error", func() {
522 err := ring.Ping(ctx).Err()
523 Expect(err).To(MatchError("redis: all ring shards are down"))
524 })
525
526 It("pipeline returns an error", func() {
527 _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
528 pipe.Ping(ctx)
529 return nil
530 })
531 Expect(err).To(MatchError("redis: all ring shards are down"))
532 })
533 })
534
535 var _ = Describe("Ring watch", func() {
536 const heartbeat = 100 * time.Millisecond
537
538 var ring *redis.Ring
539
540 BeforeEach(func() {
541 opt := redisRingOptions()
542 opt.HeartbeatFrequency = heartbeat
543 ring = redis.NewRing(opt)
544
545 err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
546 return cl.FlushDB(ctx).Err()
547 })
548 Expect(err).NotTo(HaveOccurred())
549 })
550
551 AfterEach(func() {
552 Expect(ring.Close()).NotTo(HaveOccurred())
553 })
554
555 It("should Watch", func() {
556 var incr func(string) error
557
558
559 incr = func(key string) error {
560 err := ring.Watch(ctx, func(tx *redis.Tx) error {
561 n, err := tx.Get(ctx, key).Int64()
562 if err != nil && err != redis.Nil {
563 return err
564 }
565
566 _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
567 pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
568 return nil
569 })
570 return err
571 }, key)
572 if err == redis.TxFailedErr {
573 return incr(key)
574 }
575 return err
576 }
577
578 var wg sync.WaitGroup
579 for i := 0; i < 100; i++ {
580 wg.Add(1)
581 go func() {
582 defer GinkgoRecover()
583 defer wg.Done()
584
585 err := incr("key")
586 Expect(err).NotTo(HaveOccurred())
587 }()
588 }
589 wg.Wait()
590
591 n, err := ring.Get(ctx, "key").Int64()
592 Expect(err).NotTo(HaveOccurred())
593 Expect(n).To(Equal(int64(100)))
594 })
595
596 It("should discard", func() {
597 err := ring.Watch(ctx, func(tx *redis.Tx) error {
598 cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
599 pipe.Set(ctx, "{shard}key1", "hello1", 0)
600 pipe.Discard()
601 pipe.Set(ctx, "{shard}key2", "hello2", 0)
602 return nil
603 })
604 Expect(err).NotTo(HaveOccurred())
605 Expect(cmds).To(HaveLen(1))
606 return err
607 }, "{shard}key1", "{shard}key2")
608 Expect(err).NotTo(HaveOccurred())
609
610 get := ring.Get(ctx, "{shard}key1")
611 Expect(get.Err()).To(Equal(redis.Nil))
612 Expect(get.Val()).To(Equal(""))
613
614 get = ring.Get(ctx, "{shard}key2")
615 Expect(get.Err()).NotTo(HaveOccurred())
616 Expect(get.Val()).To(Equal("hello2"))
617 })
618
619 It("returns no error when there are no commands", func() {
620 err := ring.Watch(ctx, func(tx *redis.Tx) error {
621 _, err := tx.TxPipelined(ctx, func(redis.Pipeliner) error { return nil })
622 return err
623 }, "key")
624 Expect(err).NotTo(HaveOccurred())
625
626 v, err := ring.Ping(ctx).Result()
627 Expect(err).NotTo(HaveOccurred())
628 Expect(v).To(Equal("PONG"))
629 })
630
631 It("should exec bulks", func() {
632 const N = 20000
633
634 err := ring.Watch(ctx, func(tx *redis.Tx) error {
635 cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
636 for i := 0; i < N; i++ {
637 pipe.Incr(ctx, "key")
638 }
639 return nil
640 })
641 Expect(err).NotTo(HaveOccurred())
642 Expect(len(cmds)).To(Equal(N))
643 for _, cmd := range cmds {
644 Expect(cmd.Err()).NotTo(HaveOccurred())
645 }
646 return err
647 }, "key")
648 Expect(err).NotTo(HaveOccurred())
649
650 num, err := ring.Get(ctx, "key").Int64()
651 Expect(err).NotTo(HaveOccurred())
652 Expect(num).To(Equal(int64(N)))
653 })
654
655 It("should Watch/Unwatch", func() {
656 var C, N int
657
658 err := ring.Set(ctx, "key", "0", 0).Err()
659 Expect(err).NotTo(HaveOccurred())
660
661 perform(C, func(id int) {
662 for i := 0; i < N; i++ {
663 err := ring.Watch(ctx, func(tx *redis.Tx) error {
664 val, err := tx.Get(ctx, "key").Result()
665 Expect(err).NotTo(HaveOccurred())
666 Expect(val).NotTo(Equal(redis.Nil))
667
668 num, err := strconv.ParseInt(val, 10, 64)
669 Expect(err).NotTo(HaveOccurred())
670
671 cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
672 pipe.Set(ctx, "key", strconv.FormatInt(num+1, 10), 0)
673 return nil
674 })
675 Expect(cmds).To(HaveLen(1))
676 return err
677 }, "key")
678 if err == redis.TxFailedErr {
679 i--
680 continue
681 }
682 Expect(err).NotTo(HaveOccurred())
683 }
684 })
685
686 val, err := ring.Get(ctx, "key").Int64()
687 Expect(err).NotTo(HaveOccurred())
688 Expect(val).To(Equal(int64(C * N)))
689 })
690
691 It("should close Tx without closing the client", func() {
692 err := ring.Watch(ctx, func(tx *redis.Tx) error {
693 _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
694 pipe.Ping(ctx)
695 return nil
696 })
697 return err
698 }, "key")
699 Expect(err).NotTo(HaveOccurred())
700
701 Expect(ring.Ping(ctx).Err()).NotTo(HaveOccurred())
702 })
703
704 It("respects max size on multi", func() {
705
706
707
708
709
710
711 opt := redisRingOptions()
712 opt.HeartbeatFrequency = 72 * time.Hour
713 ring = redis.NewRing(opt)
714
715 perform(1000, func(id int) {
716 var ping *redis.StatusCmd
717
718 err := ring.Watch(ctx, func(tx *redis.Tx) error {
719 cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
720 ping = pipe.Ping(ctx)
721 return nil
722 })
723 Expect(err).NotTo(HaveOccurred())
724 Expect(cmds).To(HaveLen(1))
725 return err
726 }, "key")
727 Expect(err).NotTo(HaveOccurred())
728
729 Expect(ping.Err()).NotTo(HaveOccurred())
730 Expect(ping.Val()).To(Equal("PONG"))
731 })
732
733 ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
734 defer GinkgoRecover()
735
736 pool := cl.Pool()
737 Expect(pool.Len()).To(BeNumerically("<=", 10))
738 Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
739 Expect(pool.Len()).To(Equal(pool.IdleLen()))
740
741 return nil
742 })
743 })
744 })
745
746 var _ = Describe("Ring Tx timeout", func() {
747 const heartbeat = 100 * time.Millisecond
748
749 var ring *redis.Ring
750
751 AfterEach(func() {
752 _ = ring.Close()
753 })
754
755 testTimeout := func() {
756 It("Tx timeouts", func() {
757 err := ring.Watch(ctx, func(tx *redis.Tx) error {
758 return tx.Ping(ctx).Err()
759 }, "foo")
760 Expect(err).To(HaveOccurred())
761 Expect(err.(net.Error).Timeout()).To(BeTrue())
762 })
763
764 It("Tx Pipeline timeouts", func() {
765 err := ring.Watch(ctx, func(tx *redis.Tx) error {
766 _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
767 pipe.Ping(ctx)
768 return nil
769 })
770 return err
771 }, "foo")
772 Expect(err).To(HaveOccurred())
773 Expect(err.(net.Error).Timeout()).To(BeTrue())
774 })
775 }
776
777 const pause = 5 * time.Second
778
779 Context("read/write timeout", func() {
780 BeforeEach(func() {
781 opt := redisRingOptions()
782 opt.ReadTimeout = 250 * time.Millisecond
783 opt.WriteTimeout = 250 * time.Millisecond
784 opt.HeartbeatFrequency = heartbeat
785 ring = redis.NewRing(opt)
786
787 err := ring.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
788 return client.ClientPause(ctx, pause).Err()
789 })
790 Expect(err).NotTo(HaveOccurred())
791 })
792
793 AfterEach(func() {
794 _ = ring.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
795 defer GinkgoRecover()
796 Eventually(func() error {
797 return client.Ping(ctx).Err()
798 }, 2*pause).ShouldNot(HaveOccurred())
799 return nil
800 })
801 })
802
803 testTimeout()
804 })
805 })
806
807 var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() {
808 var ring *redis.Ring
809
810 BeforeEach(func() {
811 ring = redis.NewRing(&redis.RingOptions{
812 Addrs: map[string]string{
813 "shard1": ":6379",
814 "shard2": ":6380",
815 },
816 })
817 })
818
819 AfterEach(func() {
820 Expect(ring.Close()).NotTo(HaveOccurred())
821 })
822
823 It("GetShardClients returns active shard clients", func() {
824 shards := ring.GetShardClients()
825
826
827
828
829 if len(shards) == 0 {
830
831 Skip("No active shards found (Redis servers not running)")
832 } else {
833 Expect(len(shards)).To(BeNumerically(">", 0))
834 for _, client := range shards {
835 Expect(client).NotTo(BeNil())
836 }
837 }
838 })
839
840 It("GetShardClientForKey returns correct shard for keys", func() {
841 testKeys := []string{"key1", "key2", "user:123", "channel:test"}
842
843 for _, key := range testKeys {
844 client, err := ring.GetShardClientForKey(key)
845 Expect(err).NotTo(HaveOccurred())
846 Expect(client).NotTo(BeNil())
847 }
848 })
849
850 It("GetShardClientForKey is consistent for same key", func() {
851 key := "test:consistency"
852
853
854
855 var firstClient *redis.Client
856 for i := 0; i < 5; i++ {
857 client, err := ring.GetShardClientForKey(key)
858 Expect(err).NotTo(HaveOccurred())
859 Expect(client).NotTo(BeNil())
860
861 if i == 0 {
862 firstClient = client
863 } else {
864 Expect(client.String()).To(Equal(firstClient.String()))
865 }
866 }
867 })
868
869 It("GetShardClientForKey distributes keys across shards", func() {
870 testKeys := []string{"key1", "key2", "key3", "key4", "key5"}
871 shardMap := make(map[string]int)
872
873 for _, key := range testKeys {
874 client, err := ring.GetShardClientForKey(key)
875 Expect(err).NotTo(HaveOccurred())
876 shardMap[client.String()]++
877 }
878
879
880 Expect(len(shardMap)).To(BeNumerically(">=", 1))
881
882 Expect(len(shardMap)).To(BeNumerically("<=", 2))
883 })
884 })
885
View as plain text