...

Source file src/github.com/redis/go-redis/v9/ring_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"context"
     5  	"crypto/rand"
     6  	"fmt"
     7  	"net"
     8  	"strconv"
     9  	"sync"
    10  	"time"
    11  
    12  	. "github.com/bsm/ginkgo/v2"
    13  	. "github.com/bsm/gomega"
    14  
    15  	"github.com/redis/go-redis/v9"
    16  )
    17  
    18  var _ = Describe("Redis Ring PROTO 2", func() {
    19  	const heartbeat = 100 * time.Millisecond
    20  
    21  	var ring *redis.Ring
    22  
    23  	BeforeEach(func() {
    24  		opt := redisRingOptions()
    25  		opt.Protocol = 2
    26  		opt.HeartbeatFrequency = heartbeat
    27  		ring = redis.NewRing(opt)
    28  
    29  		err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
    30  			return cl.FlushDB(ctx).Err()
    31  		})
    32  		Expect(err).NotTo(HaveOccurred())
    33  	})
    34  
    35  	AfterEach(func() {
    36  		Expect(ring.Close()).NotTo(HaveOccurred())
    37  	})
    38  
    39  	It("should ring PROTO 2", func() {
    40  		_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
    41  			val, err := c.Do(ctx, "HELLO").Result()
    42  			Expect(err).NotTo(HaveOccurred())
    43  			Expect(val).Should(ContainElements("proto", int64(2)))
    44  			return nil
    45  		})
    46  	})
    47  })
    48  
    49  var _ = Describe("Redis Ring", func() {
    50  	const heartbeat = 100 * time.Millisecond
    51  
    52  	var ring *redis.Ring
    53  
    54  	setRingKeys := func() {
    55  		for i := 0; i < 100; i++ {
    56  			err := ring.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
    57  			Expect(err).NotTo(HaveOccurred())
    58  		}
    59  	}
    60  
    61  	BeforeEach(func() {
    62  		opt := redisRingOptions()
    63  		opt.ClientName = "ring_hi"
    64  		opt.HeartbeatFrequency = heartbeat
    65  		ring = redis.NewRing(opt)
    66  
    67  		err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
    68  			return cl.FlushDB(ctx).Err()
    69  		})
    70  		Expect(err).NotTo(HaveOccurred())
    71  	})
    72  
    73  	AfterEach(func() {
    74  		Expect(ring.Close()).NotTo(HaveOccurred())
    75  	})
    76  
    77  	It("do", func() {
    78  		val, err := ring.Do(ctx, "ping").Result()
    79  		Expect(err).NotTo(HaveOccurred())
    80  		Expect(val).To(Equal("PONG"))
    81  	})
    82  
    83  	It("supports context", func() {
    84  		ctx, cancel := context.WithCancel(ctx)
    85  		cancel()
    86  
    87  		err := ring.Ping(ctx).Err()
    88  		Expect(err).To(MatchError("context canceled"))
    89  	})
    90  
    91  	It("should ring client setname", func() {
    92  		err := ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
    93  			return c.Ping(ctx).Err()
    94  		})
    95  		Expect(err).NotTo(HaveOccurred())
    96  
    97  		_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
    98  			val, err := c.ClientList(ctx).Result()
    99  			Expect(err).NotTo(HaveOccurred())
   100  			Expect(val).Should(ContainSubstring("name=ring_hi"))
   101  			return nil
   102  		})
   103  	})
   104  
   105  	It("should ring PROTO 3", func() {
   106  		_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
   107  			val, err := c.Do(ctx, "HELLO").Result()
   108  			Expect(err).NotTo(HaveOccurred())
   109  			Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
   110  			return nil
   111  		})
   112  	})
   113  
   114  	It("distributes keys", func() {
   115  		setRingKeys()
   116  
   117  		// Both shards should have some keys now.
   118  		Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
   119  		Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
   120  	})
   121  
   122  	It("distributes keys when using EVAL", func() {
   123  		script := redis.NewScript(`
   124  			local r = redis.call('SET', KEYS[1], ARGV[1])
   125  			return r
   126  		`)
   127  
   128  		var key string
   129  		for i := 0; i < 100; i++ {
   130  			key = fmt.Sprintf("key%d", i)
   131  			err := script.Run(ctx, ring, []string{key}, "value").Err()
   132  			Expect(err).NotTo(HaveOccurred())
   133  		}
   134  
   135  		Expect(ringShard1.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=56"))
   136  		Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=44"))
   137  	})
   138  
   139  	It("supports hash tags", func() {
   140  		for i := 0; i < 100; i++ {
   141  			err := ring.Set(ctx, fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
   142  			Expect(err).NotTo(HaveOccurred())
   143  		}
   144  
   145  		Expect(ringShard1.Info(ctx, "keyspace").Val()).ToNot(ContainSubstring("keys="))
   146  		Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
   147  	})
   148  
   149  	Describe("[new] dynamic setting ring shards", func() {
   150  		It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
   151  			Expect(ring.Len(), 2)
   152  
   153  			wantShard := ring.ShardByName("ringShardOne")
   154  			ring.SetAddrs(map[string]string{
   155  				"ringShardOne": ":" + ringShard1Port,
   156  			})
   157  			Expect(ring.Len(), 1)
   158  			gotShard := ring.ShardByName("ringShardOne")
   159  			Expect(gotShard).To(BeIdenticalTo(wantShard))
   160  
   161  			ring.SetAddrs(map[string]string{
   162  				"ringShardOne": ":" + ringShard1Port,
   163  				"ringShardTwo": ":" + ringShard2Port,
   164  			})
   165  			Expect(ring.Len(), 2)
   166  			gotShard = ring.ShardByName("ringShardOne")
   167  			Expect(gotShard).To(BeIdenticalTo(wantShard))
   168  		})
   169  
   170  		It("uses 3 shards after setting it to 3 shards", func() {
   171  			Expect(ring.Len(), 2)
   172  
   173  			shardName1 := "ringShardOne"
   174  			shardAddr1 := ":" + ringShard1Port
   175  			wantShard1 := ring.ShardByName(shardName1)
   176  			shardName2 := "ringShardTwo"
   177  			shardAddr2 := ":" + ringShard2Port
   178  			wantShard2 := ring.ShardByName(shardName2)
   179  			shardName3 := "ringShardThree"
   180  			shardAddr3 := ":" + ringShard3Port
   181  
   182  			ring.SetAddrs(map[string]string{
   183  				shardName1: shardAddr1,
   184  				shardName2: shardAddr2,
   185  				shardName3: shardAddr3,
   186  			})
   187  			Expect(ring.Len(), 3)
   188  			gotShard1 := ring.ShardByName(shardName1)
   189  			gotShard2 := ring.ShardByName(shardName2)
   190  			gotShard3 := ring.ShardByName(shardName3)
   191  			Expect(gotShard1).To(BeIdenticalTo(wantShard1))
   192  			Expect(gotShard2).To(BeIdenticalTo(wantShard2))
   193  			Expect(gotShard3).ToNot(BeNil())
   194  
   195  			ring.SetAddrs(map[string]string{
   196  				shardName1: shardAddr1,
   197  				shardName2: shardAddr2,
   198  			})
   199  			Expect(ring.Len(), 2)
   200  			gotShard1 = ring.ShardByName(shardName1)
   201  			gotShard2 = ring.ShardByName(shardName2)
   202  			gotShard3 = ring.ShardByName(shardName3)
   203  			Expect(gotShard1).To(BeIdenticalTo(wantShard1))
   204  			Expect(gotShard2).To(BeIdenticalTo(wantShard2))
   205  			Expect(gotShard3).To(BeNil())
   206  		})
   207  	})
   208  	Describe("pipeline", func() {
   209  		It("doesn't panic closed ring, returns error", func() {
   210  			pipe := ring.Pipeline()
   211  			for i := 0; i < 3; i++ {
   212  				err := pipe.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
   213  				Expect(err).NotTo(HaveOccurred())
   214  			}
   215  
   216  			Expect(ring.Close()).NotTo(HaveOccurred())
   217  
   218  			Expect(func() {
   219  				_, execErr := pipe.Exec(ctx)
   220  				Expect(execErr).To(HaveOccurred())
   221  			}).NotTo(Panic())
   222  		})
   223  
   224  		It("distributes keys", func() {
   225  			pipe := ring.Pipeline()
   226  			for i := 0; i < 100; i++ {
   227  				err := pipe.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
   228  				Expect(err).NotTo(HaveOccurred())
   229  			}
   230  			cmds, err := pipe.Exec(ctx)
   231  			Expect(err).NotTo(HaveOccurred())
   232  			Expect(cmds).To(HaveLen(100))
   233  
   234  			for _, cmd := range cmds {
   235  				Expect(cmd.Err()).NotTo(HaveOccurred())
   236  				Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
   237  			}
   238  
   239  			// Both shards should have some keys now.
   240  			Expect(ringShard1.Info(ctx).Val()).To(ContainSubstring("keys=56"))
   241  			Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=44"))
   242  		})
   243  
   244  		It("is consistent with ring", func() {
   245  			var keys []string
   246  			for i := 0; i < 100; i++ {
   247  				key := make([]byte, 64)
   248  				_, err := rand.Read(key)
   249  				Expect(err).NotTo(HaveOccurred())
   250  				keys = append(keys, string(key))
   251  			}
   252  
   253  			_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
   254  				for _, key := range keys {
   255  					pipe.Set(ctx, key, "value", 0).Err()
   256  				}
   257  				return nil
   258  			})
   259  			Expect(err).NotTo(HaveOccurred())
   260  
   261  			for _, key := range keys {
   262  				val, err := ring.Get(ctx, key).Result()
   263  				Expect(err).NotTo(HaveOccurred())
   264  				Expect(val).To(Equal("value"))
   265  			}
   266  		})
   267  
   268  		It("supports hash tags", func() {
   269  			_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
   270  				for i := 0; i < 100; i++ {
   271  					pipe.Set(ctx, fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
   272  				}
   273  				return nil
   274  			})
   275  			Expect(err).NotTo(HaveOccurred())
   276  
   277  			Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys="))
   278  			Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100"))
   279  		})
   280  
   281  		It("return dial timeout error", func() {
   282  			opt := redisRingOptions()
   283  			opt.DialTimeout = 250 * time.Millisecond
   284  			opt.Addrs = map[string]string{"ringShardNotExist": ":1997"}
   285  			ring = redis.NewRing(opt)
   286  
   287  			_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
   288  				pipe.HSet(ctx, "key", "value")
   289  				pipe.Expire(ctx, "key", time.Minute)
   290  				return nil
   291  			})
   292  
   293  			Expect(err).To(HaveOccurred())
   294  		})
   295  	})
   296  
   297  	Describe("new client callback", func() {
   298  		It("can be initialized with a new client callback", func() {
   299  			opts := redisRingOptions()
   300  			opts.NewClient = func(opt *redis.Options) *redis.Client {
   301  				opt.Username = "username1"
   302  				opt.Password = "password1"
   303  				return redis.NewClient(opt)
   304  			}
   305  			ring = redis.NewRing(opts)
   306  
   307  			err := ring.Ping(ctx).Err()
   308  			Expect(err).To(HaveOccurred())
   309  			Expect(err.Error()).To(ContainSubstring("WRONGPASS"))
   310  		})
   311  	})
   312  
   313  	Describe("Process hook", func() {
   314  		BeforeEach(func() {
   315  			// the health check leads to data race for variable "stack []string".
   316  			// here, the health check time is set to 72 hours to avoid health check
   317  			opt := redisRingOptions()
   318  			opt.HeartbeatFrequency = 72 * time.Hour
   319  			ring = redis.NewRing(opt)
   320  		})
   321  		It("supports Process hook", func() {
   322  			err := ring.Set(ctx, "key", "test", 0).Err()
   323  			Expect(err).NotTo(HaveOccurred())
   324  
   325  			var stack []string
   326  
   327  			ring.AddHook(&hook{
   328  				processHook: func(hook redis.ProcessHook) redis.ProcessHook {
   329  					return func(ctx context.Context, cmd redis.Cmder) error {
   330  						Expect(cmd.String()).To(Equal("get key: "))
   331  						stack = append(stack, "ring.BeforeProcess")
   332  
   333  						err := hook(ctx, cmd)
   334  
   335  						Expect(cmd.String()).To(Equal("get key: test"))
   336  						stack = append(stack, "ring.AfterProcess")
   337  
   338  						return err
   339  					}
   340  				},
   341  			})
   342  
   343  			ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
   344  				shard.AddHook(&hook{
   345  					processHook: func(hook redis.ProcessHook) redis.ProcessHook {
   346  						return func(ctx context.Context, cmd redis.Cmder) error {
   347  							Expect(cmd.String()).To(Equal("get key: "))
   348  							stack = append(stack, "shard.BeforeProcess")
   349  
   350  							err := hook(ctx, cmd)
   351  
   352  							Expect(cmd.String()).To(Equal("get key: test"))
   353  							stack = append(stack, "shard.AfterProcess")
   354  
   355  							return err
   356  						}
   357  					},
   358  				})
   359  				return nil
   360  			})
   361  
   362  			err = ring.Get(ctx, "key").Err()
   363  			Expect(err).NotTo(HaveOccurred())
   364  			Expect(stack).To(Equal([]string{
   365  				"ring.BeforeProcess",
   366  				"shard.BeforeProcess",
   367  				"shard.AfterProcess",
   368  				"ring.AfterProcess",
   369  			}))
   370  		})
   371  
   372  		It("supports Pipeline hook", func() {
   373  			err := ring.Ping(ctx).Err()
   374  			Expect(err).NotTo(HaveOccurred())
   375  
   376  			var stack []string
   377  
   378  			ring.AddHook(&hook{
   379  				processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
   380  					return func(ctx context.Context, cmds []redis.Cmder) error {
   381  						// skip the connection initialization
   382  						if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
   383  							return nil
   384  						}
   385  						Expect(len(cmds)).To(BeNumerically(">", 0))
   386  						Expect(cmds[0].String()).To(Equal("ping: "))
   387  						stack = append(stack, "ring.BeforeProcessPipeline")
   388  
   389  						err := hook(ctx, cmds)
   390  
   391  						Expect(len(cmds)).To(BeNumerically(">", 0))
   392  						Expect(cmds[0].String()).To(Equal("ping: PONG"))
   393  						stack = append(stack, "ring.AfterProcessPipeline")
   394  
   395  						return err
   396  					}
   397  				},
   398  			})
   399  
   400  			ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
   401  				shard.AddHook(&hook{
   402  					processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
   403  						return func(ctx context.Context, cmds []redis.Cmder) error {
   404  							// skip the connection initialization
   405  							if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
   406  								return nil
   407  							}
   408  							Expect(len(cmds)).To(BeNumerically(">", 0))
   409  							Expect(cmds[0].String()).To(Equal("ping: "))
   410  							stack = append(stack, "shard.BeforeProcessPipeline")
   411  
   412  							err := hook(ctx, cmds)
   413  
   414  							Expect(len(cmds)).To(BeNumerically(">", 0))
   415  							Expect(cmds[0].String()).To(Equal("ping: PONG"))
   416  							stack = append(stack, "shard.AfterProcessPipeline")
   417  
   418  							return err
   419  						}
   420  					},
   421  				})
   422  				return nil
   423  			})
   424  
   425  			_, err = ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
   426  				pipe.Ping(ctx)
   427  				return nil
   428  			})
   429  			Expect(err).NotTo(HaveOccurred())
   430  			Expect(stack).To(Equal([]string{
   431  				"ring.BeforeProcessPipeline",
   432  				"shard.BeforeProcessPipeline",
   433  				"shard.AfterProcessPipeline",
   434  				"ring.AfterProcessPipeline",
   435  			}))
   436  		})
   437  
   438  		It("supports TxPipeline hook", func() {
   439  			err := ring.Ping(ctx).Err()
   440  			Expect(err).NotTo(HaveOccurred())
   441  
   442  			var stack []string
   443  
   444  			ring.AddHook(&hook{
   445  				processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
   446  					return func(ctx context.Context, cmds []redis.Cmder) error {
   447  						defer GinkgoRecover()
   448  						// skip the connection initialization
   449  						if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
   450  							return nil
   451  						}
   452  
   453  						Expect(len(cmds)).To(BeNumerically(">=", 3))
   454  						Expect(cmds[1].String()).To(Equal("ping: "))
   455  						stack = append(stack, "ring.BeforeProcessPipeline")
   456  
   457  						err := hook(ctx, cmds)
   458  
   459  						Expect(len(cmds)).To(BeNumerically(">=", 3))
   460  						Expect(cmds[1].String()).To(Equal("ping: PONG"))
   461  						stack = append(stack, "ring.AfterProcessPipeline")
   462  
   463  						return err
   464  					}
   465  				},
   466  			})
   467  
   468  			ring.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
   469  				shard.AddHook(&hook{
   470  					processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
   471  						return func(ctx context.Context, cmds []redis.Cmder) error {
   472  							defer GinkgoRecover()
   473  							// skip the connection initialization
   474  							if cmds[0].Name() == "hello" || cmds[0].Name() == "client" {
   475  								return nil
   476  							}
   477  
   478  							Expect(len(cmds)).To(BeNumerically(">=", 3))
   479  							Expect(cmds[1].String()).To(Equal("ping: "))
   480  							stack = append(stack, "shard.BeforeProcessPipeline")
   481  
   482  							err := hook(ctx, cmds)
   483  
   484  							Expect(len(cmds)).To(BeNumerically(">=", 3))
   485  							Expect(cmds[1].String()).To(Equal("ping: PONG"))
   486  							stack = append(stack, "shard.AfterProcessPipeline")
   487  
   488  							return err
   489  						}
   490  					},
   491  				})
   492  				return nil
   493  			})
   494  
   495  			_, err = ring.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   496  				pipe.Ping(ctx)
   497  				return nil
   498  			})
   499  			Expect(err).NotTo(HaveOccurred())
   500  			Expect(stack).To(Equal([]string{
   501  				"ring.BeforeProcessPipeline",
   502  				"shard.BeforeProcessPipeline",
   503  				"shard.AfterProcessPipeline",
   504  				"ring.AfterProcessPipeline",
   505  			}))
   506  		})
   507  	})
   508  })
   509  
   510  var _ = Describe("empty Redis Ring", func() {
   511  	var ring *redis.Ring
   512  
   513  	BeforeEach(func() {
   514  		ring = redis.NewRing(&redis.RingOptions{})
   515  	})
   516  
   517  	AfterEach(func() {
   518  		Expect(ring.Close()).NotTo(HaveOccurred())
   519  	})
   520  
   521  	It("returns an error", func() {
   522  		err := ring.Ping(ctx).Err()
   523  		Expect(err).To(MatchError("redis: all ring shards are down"))
   524  	})
   525  
   526  	It("pipeline returns an error", func() {
   527  		_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
   528  			pipe.Ping(ctx)
   529  			return nil
   530  		})
   531  		Expect(err).To(MatchError("redis: all ring shards are down"))
   532  	})
   533  })
   534  
   535  var _ = Describe("Ring watch", func() {
   536  	const heartbeat = 100 * time.Millisecond
   537  
   538  	var ring *redis.Ring
   539  
   540  	BeforeEach(func() {
   541  		opt := redisRingOptions()
   542  		opt.HeartbeatFrequency = heartbeat
   543  		ring = redis.NewRing(opt)
   544  
   545  		err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
   546  			return cl.FlushDB(ctx).Err()
   547  		})
   548  		Expect(err).NotTo(HaveOccurred())
   549  	})
   550  
   551  	AfterEach(func() {
   552  		Expect(ring.Close()).NotTo(HaveOccurred())
   553  	})
   554  
   555  	It("should Watch", func() {
   556  		var incr func(string) error
   557  
   558  		// Transactionally increments key using GET and SET commands.
   559  		incr = func(key string) error {
   560  			err := ring.Watch(ctx, func(tx *redis.Tx) error {
   561  				n, err := tx.Get(ctx, key).Int64()
   562  				if err != nil && err != redis.Nil {
   563  					return err
   564  				}
   565  
   566  				_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   567  					pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
   568  					return nil
   569  				})
   570  				return err
   571  			}, key)
   572  			if err == redis.TxFailedErr {
   573  				return incr(key)
   574  			}
   575  			return err
   576  		}
   577  
   578  		var wg sync.WaitGroup
   579  		for i := 0; i < 100; i++ {
   580  			wg.Add(1)
   581  			go func() {
   582  				defer GinkgoRecover()
   583  				defer wg.Done()
   584  
   585  				err := incr("key")
   586  				Expect(err).NotTo(HaveOccurred())
   587  			}()
   588  		}
   589  		wg.Wait()
   590  
   591  		n, err := ring.Get(ctx, "key").Int64()
   592  		Expect(err).NotTo(HaveOccurred())
   593  		Expect(n).To(Equal(int64(100)))
   594  	})
   595  
   596  	It("should discard", func() {
   597  		err := ring.Watch(ctx, func(tx *redis.Tx) error {
   598  			cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   599  				pipe.Set(ctx, "{shard}key1", "hello1", 0)
   600  				pipe.Discard()
   601  				pipe.Set(ctx, "{shard}key2", "hello2", 0)
   602  				return nil
   603  			})
   604  			Expect(err).NotTo(HaveOccurred())
   605  			Expect(cmds).To(HaveLen(1))
   606  			return err
   607  		}, "{shard}key1", "{shard}key2")
   608  		Expect(err).NotTo(HaveOccurred())
   609  
   610  		get := ring.Get(ctx, "{shard}key1")
   611  		Expect(get.Err()).To(Equal(redis.Nil))
   612  		Expect(get.Val()).To(Equal(""))
   613  
   614  		get = ring.Get(ctx, "{shard}key2")
   615  		Expect(get.Err()).NotTo(HaveOccurred())
   616  		Expect(get.Val()).To(Equal("hello2"))
   617  	})
   618  
   619  	It("returns no error when there are no commands", func() {
   620  		err := ring.Watch(ctx, func(tx *redis.Tx) error {
   621  			_, err := tx.TxPipelined(ctx, func(redis.Pipeliner) error { return nil })
   622  			return err
   623  		}, "key")
   624  		Expect(err).NotTo(HaveOccurred())
   625  
   626  		v, err := ring.Ping(ctx).Result()
   627  		Expect(err).NotTo(HaveOccurred())
   628  		Expect(v).To(Equal("PONG"))
   629  	})
   630  
   631  	It("should exec bulks", func() {
   632  		const N = 20000
   633  
   634  		err := ring.Watch(ctx, func(tx *redis.Tx) error {
   635  			cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   636  				for i := 0; i < N; i++ {
   637  					pipe.Incr(ctx, "key")
   638  				}
   639  				return nil
   640  			})
   641  			Expect(err).NotTo(HaveOccurred())
   642  			Expect(len(cmds)).To(Equal(N))
   643  			for _, cmd := range cmds {
   644  				Expect(cmd.Err()).NotTo(HaveOccurred())
   645  			}
   646  			return err
   647  		}, "key")
   648  		Expect(err).NotTo(HaveOccurred())
   649  
   650  		num, err := ring.Get(ctx, "key").Int64()
   651  		Expect(err).NotTo(HaveOccurred())
   652  		Expect(num).To(Equal(int64(N)))
   653  	})
   654  
   655  	It("should Watch/Unwatch", func() {
   656  		var C, N int
   657  
   658  		err := ring.Set(ctx, "key", "0", 0).Err()
   659  		Expect(err).NotTo(HaveOccurred())
   660  
   661  		perform(C, func(id int) {
   662  			for i := 0; i < N; i++ {
   663  				err := ring.Watch(ctx, func(tx *redis.Tx) error {
   664  					val, err := tx.Get(ctx, "key").Result()
   665  					Expect(err).NotTo(HaveOccurred())
   666  					Expect(val).NotTo(Equal(redis.Nil))
   667  
   668  					num, err := strconv.ParseInt(val, 10, 64)
   669  					Expect(err).NotTo(HaveOccurred())
   670  
   671  					cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   672  						pipe.Set(ctx, "key", strconv.FormatInt(num+1, 10), 0)
   673  						return nil
   674  					})
   675  					Expect(cmds).To(HaveLen(1))
   676  					return err
   677  				}, "key")
   678  				if err == redis.TxFailedErr {
   679  					i--
   680  					continue
   681  				}
   682  				Expect(err).NotTo(HaveOccurred())
   683  			}
   684  		})
   685  
   686  		val, err := ring.Get(ctx, "key").Int64()
   687  		Expect(err).NotTo(HaveOccurred())
   688  		Expect(val).To(Equal(int64(C * N)))
   689  	})
   690  
   691  	It("should close Tx without closing the client", func() {
   692  		err := ring.Watch(ctx, func(tx *redis.Tx) error {
   693  			_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   694  				pipe.Ping(ctx)
   695  				return nil
   696  			})
   697  			return err
   698  		}, "key")
   699  		Expect(err).NotTo(HaveOccurred())
   700  
   701  		Expect(ring.Ping(ctx).Err()).NotTo(HaveOccurred())
   702  	})
   703  
   704  	It("respects max size on multi", func() {
   705  		//this test checks the number of "pool.conn"
   706  		//if the health check is performed at the same time
   707  		//conn will be used, resulting in an abnormal number of "pool.conn".
   708  		//
   709  		//redis.NewRing() does not have an option to prohibit health checks.
   710  		//set a relatively large time here to avoid health checks.
   711  		opt := redisRingOptions()
   712  		opt.HeartbeatFrequency = 72 * time.Hour
   713  		ring = redis.NewRing(opt)
   714  
   715  		perform(1000, func(id int) {
   716  			var ping *redis.StatusCmd
   717  
   718  			err := ring.Watch(ctx, func(tx *redis.Tx) error {
   719  				cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   720  					ping = pipe.Ping(ctx)
   721  					return nil
   722  				})
   723  				Expect(err).NotTo(HaveOccurred())
   724  				Expect(cmds).To(HaveLen(1))
   725  				return err
   726  			}, "key")
   727  			Expect(err).NotTo(HaveOccurred())
   728  
   729  			Expect(ping.Err()).NotTo(HaveOccurred())
   730  			Expect(ping.Val()).To(Equal("PONG"))
   731  		})
   732  
   733  		ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
   734  			defer GinkgoRecover()
   735  
   736  			pool := cl.Pool()
   737  			Expect(pool.Len()).To(BeNumerically("<=", 10))
   738  			Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
   739  			Expect(pool.Len()).To(Equal(pool.IdleLen()))
   740  
   741  			return nil
   742  		})
   743  	})
   744  })
   745  
   746  var _ = Describe("Ring Tx timeout", func() {
   747  	const heartbeat = 100 * time.Millisecond
   748  
   749  	var ring *redis.Ring
   750  
   751  	AfterEach(func() {
   752  		_ = ring.Close()
   753  	})
   754  
   755  	testTimeout := func() {
   756  		It("Tx timeouts", func() {
   757  			err := ring.Watch(ctx, func(tx *redis.Tx) error {
   758  				return tx.Ping(ctx).Err()
   759  			}, "foo")
   760  			Expect(err).To(HaveOccurred())
   761  			Expect(err.(net.Error).Timeout()).To(BeTrue())
   762  		})
   763  
   764  		It("Tx Pipeline timeouts", func() {
   765  			err := ring.Watch(ctx, func(tx *redis.Tx) error {
   766  				_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   767  					pipe.Ping(ctx)
   768  					return nil
   769  				})
   770  				return err
   771  			}, "foo")
   772  			Expect(err).To(HaveOccurred())
   773  			Expect(err.(net.Error).Timeout()).To(BeTrue())
   774  		})
   775  	}
   776  
   777  	const pause = 5 * time.Second
   778  
   779  	Context("read/write timeout", func() {
   780  		BeforeEach(func() {
   781  			opt := redisRingOptions()
   782  			opt.ReadTimeout = 250 * time.Millisecond
   783  			opt.WriteTimeout = 250 * time.Millisecond
   784  			opt.HeartbeatFrequency = heartbeat
   785  			ring = redis.NewRing(opt)
   786  
   787  			err := ring.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
   788  				return client.ClientPause(ctx, pause).Err()
   789  			})
   790  			Expect(err).NotTo(HaveOccurred())
   791  		})
   792  
   793  		AfterEach(func() {
   794  			_ = ring.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
   795  				defer GinkgoRecover()
   796  				Eventually(func() error {
   797  					return client.Ping(ctx).Err()
   798  				}, 2*pause).ShouldNot(HaveOccurred())
   799  				return nil
   800  			})
   801  		})
   802  
   803  		testTimeout()
   804  	})
   805  })
   806  
   807  var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() {
   808  	var ring *redis.Ring
   809  
   810  	BeforeEach(func() {
   811  		ring = redis.NewRing(&redis.RingOptions{
   812  			Addrs: map[string]string{
   813  				"shard1": ":6379",
   814  				"shard2": ":6380",
   815  			},
   816  		})
   817  	})
   818  
   819  	AfterEach(func() {
   820  		Expect(ring.Close()).NotTo(HaveOccurred())
   821  	})
   822  
   823  	It("GetShardClients returns active shard clients", func() {
   824  		shards := ring.GetShardClients()
   825  		// Note: This test will pass even if Redis servers are not running,
   826  		// because GetShardClients only returns clients that are marked as "up",
   827  		// and newly created shards start as "up" until the first health check fails.
   828  
   829  		if len(shards) == 0 {
   830  			// Expected if Redis servers are not running
   831  			Skip("No active shards found (Redis servers not running)")
   832  		} else {
   833  			Expect(len(shards)).To(BeNumerically(">", 0))
   834  			for _, client := range shards {
   835  				Expect(client).NotTo(BeNil())
   836  			}
   837  		}
   838  	})
   839  
   840  	It("GetShardClientForKey returns correct shard for keys", func() {
   841  		testKeys := []string{"key1", "key2", "user:123", "channel:test"}
   842  
   843  		for _, key := range testKeys {
   844  			client, err := ring.GetShardClientForKey(key)
   845  			Expect(err).NotTo(HaveOccurred())
   846  			Expect(client).NotTo(BeNil())
   847  		}
   848  	})
   849  
   850  	It("GetShardClientForKey is consistent for same key", func() {
   851  		key := "test:consistency"
   852  
   853  		// Call GetShardClientForKey multiple times with the same key
   854  		// Should always return the same shard
   855  		var firstClient *redis.Client
   856  		for i := 0; i < 5; i++ {
   857  			client, err := ring.GetShardClientForKey(key)
   858  			Expect(err).NotTo(HaveOccurred())
   859  			Expect(client).NotTo(BeNil())
   860  
   861  			if i == 0 {
   862  				firstClient = client
   863  			} else {
   864  				Expect(client.String()).To(Equal(firstClient.String()))
   865  			}
   866  		}
   867  	})
   868  
   869  	It("GetShardClientForKey distributes keys across shards", func() {
   870  		testKeys := []string{"key1", "key2", "key3", "key4", "key5"}
   871  		shardMap := make(map[string]int)
   872  
   873  		for _, key := range testKeys {
   874  			client, err := ring.GetShardClientForKey(key)
   875  			Expect(err).NotTo(HaveOccurred())
   876  			shardMap[client.String()]++
   877  		}
   878  
   879  		// Should have at least 1 shard (could be all keys go to same shard due to hashing)
   880  		Expect(len(shardMap)).To(BeNumerically(">=", 1))
   881  		// But with multiple keys, we expect some distribution
   882  		Expect(len(shardMap)).To(BeNumerically("<=", 2)) // At most 2 shards (our setup)
   883  	})
   884  })
   885  

View as plain text