...

Source file src/github.com/redis/go-redis/v9/osscluster_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"errors"
     7  	"fmt"
     8  	"net"
     9  	"slices"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"sync/atomic"
    14  	"time"
    15  
    16  	. "github.com/bsm/ginkgo/v2"
    17  	. "github.com/bsm/gomega"
    18  
    19  	"github.com/redis/go-redis/v9"
    20  	"github.com/redis/go-redis/v9/internal/hashtag"
    21  )
    22  
    23  type clusterScenario struct {
    24  	ports   []string
    25  	nodeIDs []string
    26  	clients map[string]*redis.Client
    27  }
    28  
    29  func (s *clusterScenario) slots() []int {
    30  	return []int{0, 5461, 10923, 16384}
    31  }
    32  
    33  func (s *clusterScenario) masters() []*redis.Client {
    34  	result := make([]*redis.Client, 3)
    35  	for pos, port := range s.ports[:3] {
    36  		result[pos] = s.clients[port]
    37  	}
    38  	return result
    39  }
    40  
    41  func (s *clusterScenario) slaves() []*redis.Client {
    42  	result := make([]*redis.Client, 3)
    43  	for pos, port := range s.ports[3:] {
    44  		result[pos] = s.clients[port]
    45  	}
    46  	return result
    47  }
    48  
    49  func (s *clusterScenario) addrs() []string {
    50  	addrs := make([]string, len(s.ports))
    51  	for i, port := range s.ports {
    52  		addrs[i] = net.JoinHostPort("127.0.0.1", port)
    53  	}
    54  	return addrs
    55  }
    56  
    57  func (s *clusterScenario) newClusterClientUnstable(opt *redis.ClusterOptions) *redis.ClusterClient {
    58  	opt.Addrs = s.addrs()
    59  	return redis.NewClusterClient(opt)
    60  }
    61  
    62  func (s *clusterScenario) newClusterClient(
    63  	ctx context.Context, opt *redis.ClusterOptions,
    64  ) *redis.ClusterClient {
    65  	client := s.newClusterClientUnstable(opt)
    66  
    67  	err := eventually(func() error {
    68  		if opt.ClusterSlots != nil {
    69  			return nil
    70  		}
    71  
    72  		state, err := client.LoadState(ctx)
    73  		if err != nil {
    74  			return err
    75  		}
    76  
    77  		if !state.IsConsistent(ctx) {
    78  			return fmt.Errorf("cluster state is not consistent")
    79  		}
    80  
    81  		return nil
    82  	}, 30*time.Second)
    83  	if err != nil {
    84  		panic(err)
    85  	}
    86  
    87  	return client
    88  }
    89  
    90  func (s *clusterScenario) Close() error {
    91  	ctx := context.TODO()
    92  	for _, master := range s.masters() {
    93  		if master == nil {
    94  			continue
    95  		}
    96  		err := master.FlushAll(ctx).Err()
    97  		if err != nil {
    98  			return err
    99  		}
   100  
   101  		// since 7.2 forget calls should be propagated, calling only master
   102  		// nodes should be sufficient.
   103  		for _, nID := range s.nodeIDs {
   104  			master.ClusterForget(ctx, nID)
   105  		}
   106  	}
   107  
   108  	return nil
   109  }
   110  
   111  func configureClusterTopology(ctx context.Context, scenario *clusterScenario) error {
   112  	allowErrs := []string{
   113  		"ERR Slot 0 is already busy",
   114  		"ERR Slot 5461 is already busy",
   115  		"ERR Slot 10923 is already busy",
   116  		"ERR Slot 16384 is already busy",
   117  	}
   118  
   119  	err := collectNodeInformation(ctx, scenario)
   120  	if err != nil {
   121  		return err
   122  	}
   123  
   124  	// Meet cluster nodes.
   125  	for _, client := range scenario.clients {
   126  		err := client.ClusterMeet(ctx, "127.0.0.1", scenario.ports[0]).Err()
   127  		if err != nil {
   128  			return err
   129  		}
   130  	}
   131  
   132  	slots := scenario.slots()
   133  	for pos, master := range scenario.masters() {
   134  		err := master.ClusterAddSlotsRange(ctx, slots[pos], slots[pos+1]-1).Err()
   135  		if err != nil && slices.Contains(allowErrs, err.Error()) == false {
   136  			return err
   137  		}
   138  	}
   139  
   140  	// Bootstrap slaves.
   141  	for idx, slave := range scenario.slaves() {
   142  		masterID := scenario.nodeIDs[idx]
   143  
   144  		// Wait until master is available
   145  		err := eventually(func() error {
   146  			s := slave.ClusterNodes(ctx).Val()
   147  			wanted := masterID
   148  			if !strings.Contains(s, wanted) {
   149  				return fmt.Errorf("%q does not contain %q", s, wanted)
   150  			}
   151  			return nil
   152  		}, 10*time.Second)
   153  		if err != nil {
   154  			return err
   155  		}
   156  
   157  		err = slave.ClusterReplicate(ctx, masterID).Err()
   158  		if err != nil {
   159  			return err
   160  		}
   161  	}
   162  
   163  	// Wait until all nodes have consistent info.
   164  	wanted := []redis.ClusterSlot{{
   165  		Start: 0,
   166  		End:   5460,
   167  		Nodes: []redis.ClusterNode{{
   168  			ID:   "",
   169  			Addr: "127.0.0.1:16600",
   170  		}, {
   171  			ID:   "",
   172  			Addr: "127.0.0.1:16603",
   173  		}},
   174  	}, {
   175  		Start: 5461,
   176  		End:   10922,
   177  		Nodes: []redis.ClusterNode{{
   178  			ID:   "",
   179  			Addr: "127.0.0.1:16601",
   180  		}, {
   181  			ID:   "",
   182  			Addr: "127.0.0.1:16604",
   183  		}},
   184  	}, {
   185  		Start: 10923,
   186  		End:   16383,
   187  		Nodes: []redis.ClusterNode{{
   188  			ID:   "",
   189  			Addr: "127.0.0.1:16602",
   190  		}, {
   191  			ID:   "",
   192  			Addr: "127.0.0.1:16605",
   193  		}},
   194  	}}
   195  
   196  	for _, client := range scenario.clients {
   197  		err := eventually(func() error {
   198  			res, err := client.ClusterSlots(ctx).Result()
   199  			if err != nil {
   200  				return err
   201  			}
   202  			return assertSlotsEqual(res, wanted)
   203  		}, 90*time.Second)
   204  		if err != nil {
   205  			return err
   206  		}
   207  	}
   208  
   209  	return nil
   210  }
   211  
   212  func collectNodeInformation(ctx context.Context, scenario *clusterScenario) error {
   213  	for pos, port := range scenario.ports {
   214  		client := redis.NewClient(&redis.Options{
   215  			Addr: ":" + port,
   216  		})
   217  
   218  		myID, err := client.ClusterMyID(ctx).Result()
   219  		if err != nil {
   220  			return err
   221  		}
   222  
   223  		scenario.clients[port] = client
   224  		scenario.nodeIDs[pos] = myID
   225  	}
   226  	return nil
   227  }
   228  
   229  func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
   230  outerLoop:
   231  	for _, s2 := range wanted {
   232  		for _, s1 := range slots {
   233  			if slotEqual(s1, s2) {
   234  				continue outerLoop
   235  			}
   236  		}
   237  		return fmt.Errorf("%v not found in %v", s2, slots)
   238  	}
   239  	return nil
   240  }
   241  
   242  func slotEqual(s1, s2 redis.ClusterSlot) bool {
   243  	if s1.Start != s2.Start {
   244  		return false
   245  	}
   246  	if s1.End != s2.End {
   247  		return false
   248  	}
   249  	if len(s1.Nodes) != len(s2.Nodes) {
   250  		return false
   251  	}
   252  	for i, n1 := range s1.Nodes {
   253  		if n1.Addr != s2.Nodes[i].Addr {
   254  			return false
   255  		}
   256  	}
   257  	return true
   258  }
   259  
   260  //------------------------------------------------------------------------------
   261  
   262  var _ = Describe("ClusterClient", func() {
   263  	var failover bool
   264  	var opt *redis.ClusterOptions
   265  	var client *redis.ClusterClient
   266  
   267  	assertClusterClient := func() {
   268  		It("do", func() {
   269  			val, err := client.Do(ctx, "ping").Result()
   270  			Expect(err).NotTo(HaveOccurred())
   271  			Expect(val).To(Equal("PONG"))
   272  		})
   273  
   274  		It("should GET/SET/DEL", func() {
   275  			err := client.Get(ctx, "A").Err()
   276  			Expect(err).To(Equal(redis.Nil))
   277  
   278  			err = client.Set(ctx, "A", "VALUE", 0).Err()
   279  			Expect(err).NotTo(HaveOccurred())
   280  
   281  			Eventually(func() string {
   282  				return client.Get(ctx, "A").Val()
   283  			}, 30*time.Second).Should(Equal("VALUE"))
   284  
   285  			cnt, err := client.Del(ctx, "A").Result()
   286  			Expect(err).NotTo(HaveOccurred())
   287  			Expect(cnt).To(Equal(int64(1)))
   288  		})
   289  
   290  		It("GET follows redirects", func() {
   291  			err := client.Set(ctx, "A", "VALUE", 0).Err()
   292  			Expect(err).NotTo(HaveOccurred())
   293  
   294  			if !failover {
   295  				Eventually(func() int64 {
   296  					nodes, err := client.Nodes(ctx, "A")
   297  					if err != nil {
   298  						return 0
   299  					}
   300  					return nodes[1].Client.DBSize(ctx).Val()
   301  				}, 30*time.Second).Should(Equal(int64(1)))
   302  
   303  				Eventually(func() error {
   304  					return client.SwapNodes(ctx, "A")
   305  				}, 30*time.Second).ShouldNot(HaveOccurred())
   306  			}
   307  
   308  			v, err := client.Get(ctx, "A").Result()
   309  			Expect(err).NotTo(HaveOccurred())
   310  			Expect(v).To(Equal("VALUE"))
   311  		})
   312  
   313  		It("SET follows redirects", func() {
   314  			if !failover {
   315  				Eventually(func() error {
   316  					return client.SwapNodes(ctx, "A")
   317  				}, 30*time.Second).ShouldNot(HaveOccurred())
   318  			}
   319  
   320  			err := client.Set(ctx, "A", "VALUE", 0).Err()
   321  			Expect(err).NotTo(HaveOccurred())
   322  
   323  			v, err := client.Get(ctx, "A").Result()
   324  			Expect(err).NotTo(HaveOccurred())
   325  			Expect(v).To(Equal("VALUE"))
   326  		})
   327  
   328  		It("distributes keys", func() {
   329  			for i := 0; i < 100; i++ {
   330  				err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
   331  				Expect(err).NotTo(HaveOccurred())
   332  			}
   333  
   334  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   335  				defer GinkgoRecover()
   336  				Eventually(func() string {
   337  					return master.Info(ctx, "keyspace").Val()
   338  				}, 30*time.Second).Should(Or(
   339  					ContainSubstring("keys=32"),
   340  					ContainSubstring("keys=36"),
   341  					ContainSubstring("keys=32"),
   342  				))
   343  				return nil
   344  			})
   345  
   346  			Expect(err).NotTo(HaveOccurred())
   347  		})
   348  
   349  		It("distributes keys when using EVAL", func() {
   350  			script := redis.NewScript(`
   351  				local r = redis.call('SET', KEYS[1], ARGV[1])
   352  				return r
   353  			`)
   354  
   355  			var key string
   356  			for i := 0; i < 100; i++ {
   357  				key = fmt.Sprintf("key%d", i)
   358  				err := script.Run(ctx, client, []string{key}, "value").Err()
   359  				Expect(err).NotTo(HaveOccurred())
   360  			}
   361  
   362  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   363  				defer GinkgoRecover()
   364  				Eventually(func() string {
   365  					return master.Info(ctx, "keyspace").Val()
   366  				}, 30*time.Second).Should(Or(
   367  					ContainSubstring("keys=32"),
   368  					ContainSubstring("keys=36"),
   369  					ContainSubstring("keys=32"),
   370  				))
   371  				return nil
   372  			})
   373  
   374  			Expect(err).NotTo(HaveOccurred())
   375  		})
   376  
   377  		It("distributes scripts when using Script Load", func() {
   378  			client.ScriptFlush(ctx)
   379  
   380  			script := redis.NewScript(`return 'Unique script'`)
   381  
   382  			script.Load(ctx, client)
   383  
   384  			err := client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
   385  				defer GinkgoRecover()
   386  
   387  				val, _ := script.Exists(ctx, shard).Result()
   388  				Expect(val[0]).To(Equal(true))
   389  				return nil
   390  			})
   391  			Expect(err).NotTo(HaveOccurred())
   392  		})
   393  
   394  		It("checks all shards when using Script Exists", func() {
   395  			client.ScriptFlush(ctx)
   396  
   397  			script := redis.NewScript(`return 'First script'`)
   398  			lostScriptSrc := `return 'Lost script'`
   399  			lostScript := redis.NewScript(lostScriptSrc)
   400  
   401  			script.Load(ctx, client)
   402  			client.Do(ctx, "script", "load", lostScriptSrc)
   403  
   404  			val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()
   405  
   406  			Expect(val).To(Equal([]bool{true, false}))
   407  		})
   408  
   409  		It("flushes scripts from all shards when using ScriptFlush", func() {
   410  			script := redis.NewScript(`return 'Unnecessary script'`)
   411  			script.Load(ctx, client)
   412  
   413  			val, _ := client.ScriptExists(ctx, script.Hash()).Result()
   414  			Expect(val).To(Equal([]bool{true}))
   415  
   416  			client.ScriptFlush(ctx)
   417  
   418  			val, _ = client.ScriptExists(ctx, script.Hash()).Result()
   419  			Expect(val).To(Equal([]bool{false}))
   420  		})
   421  
   422  		It("supports Watch", func() {
   423  			var incr func(string) error
   424  
   425  			// Transactionally increments key using GET and SET commands.
   426  			incr = func(key string) error {
   427  				err := client.Watch(ctx, func(tx *redis.Tx) error {
   428  					n, err := tx.Get(ctx, key).Int64()
   429  					if err != nil && err != redis.Nil {
   430  						return err
   431  					}
   432  
   433  					_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   434  						pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
   435  						return nil
   436  					})
   437  					return err
   438  				}, key)
   439  				if err == redis.TxFailedErr {
   440  					return incr(key)
   441  				}
   442  				return err
   443  			}
   444  
   445  			var wg sync.WaitGroup
   446  			for i := 0; i < 100; i++ {
   447  				wg.Add(1)
   448  				go func() {
   449  					defer GinkgoRecover()
   450  					defer wg.Done()
   451  
   452  					err := incr("key")
   453  					Expect(err).NotTo(HaveOccurred())
   454  				}()
   455  			}
   456  			wg.Wait()
   457  
   458  			Eventually(func() string {
   459  				return client.Get(ctx, "key").Val()
   460  			}, 30*time.Second).Should(Equal("100"))
   461  		})
   462  
   463  		Describe("pipelining", func() {
   464  			var pipe *redis.Pipeline
   465  
   466  			assertPipeline := func(keys []string) {
   467  
   468  				It("follows redirects", func() {
   469  					if !failover {
   470  						for _, key := range keys {
   471  							Eventually(func() error {
   472  								return client.SwapNodes(ctx, key)
   473  							}, 30*time.Second).ShouldNot(HaveOccurred())
   474  						}
   475  					}
   476  
   477  					for i, key := range keys {
   478  						pipe.Set(ctx, key, key+"_value", 0)
   479  						pipe.Expire(ctx, key, time.Duration(i+1)*time.Hour)
   480  					}
   481  					cmds, err := pipe.Exec(ctx)
   482  					Expect(err).NotTo(HaveOccurred())
   483  					Expect(cmds).To(HaveLen(14))
   484  
   485  					// Check that all keys are set.
   486  					for _, key := range keys {
   487  						Eventually(func() string {
   488  							return client.Get(ctx, key).Val()
   489  						}, 30*time.Second).Should(Equal(key + "_value"))
   490  					}
   491  
   492  					if !failover {
   493  						for _, key := range keys {
   494  							Eventually(func() error {
   495  								return client.SwapNodes(ctx, key)
   496  							}, 30*time.Second).ShouldNot(HaveOccurred())
   497  						}
   498  					}
   499  
   500  					for _, key := range keys {
   501  						pipe.Get(ctx, key)
   502  						pipe.TTL(ctx, key)
   503  					}
   504  					cmds, err = pipe.Exec(ctx)
   505  					Expect(err).NotTo(HaveOccurred())
   506  					Expect(cmds).To(HaveLen(14))
   507  
   508  					for i, key := range keys {
   509  						get := cmds[i*2].(*redis.StringCmd)
   510  						Expect(get.Val()).To(Equal(key + "_value"))
   511  
   512  						ttl := cmds[(i*2)+1].(*redis.DurationCmd)
   513  						dur := time.Duration(i+1) * time.Hour
   514  						Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
   515  					}
   516  				})
   517  
   518  				It("works with missing keys", func() {
   519  					pipe.Set(ctx, "A{s}", "A_value", 0)
   520  					pipe.Set(ctx, "C{s}", "C_value", 0)
   521  					_, err := pipe.Exec(ctx)
   522  					Expect(err).NotTo(HaveOccurred())
   523  
   524  					a := pipe.Get(ctx, "A{s}")
   525  					b := pipe.Get(ctx, "B{s}")
   526  					c := pipe.Get(ctx, "C{s}")
   527  					cmds, err := pipe.Exec(ctx)
   528  					Expect(err).To(Equal(redis.Nil))
   529  					Expect(cmds).To(HaveLen(3))
   530  
   531  					Expect(a.Err()).NotTo(HaveOccurred())
   532  					Expect(a.Val()).To(Equal("A_value"))
   533  
   534  					Expect(b.Err()).To(Equal(redis.Nil))
   535  					Expect(b.Val()).To(Equal(""))
   536  
   537  					Expect(c.Err()).NotTo(HaveOccurred())
   538  					Expect(c.Val()).To(Equal("C_value"))
   539  				})
   540  			}
   541  
   542  			Describe("with Pipeline", func() {
   543  				BeforeEach(func() {
   544  					pipe = client.Pipeline().(*redis.Pipeline)
   545  				})
   546  
   547  				AfterEach(func() {})
   548  
   549  				keys := []string{"A", "B", "C", "D", "E", "F", "G"}
   550  				assertPipeline(keys)
   551  
   552  				It("doesn't fail node with context.Canceled error", func() {
   553  					ctx, cancel := context.WithCancel(context.Background())
   554  					cancel()
   555  					pipe.Set(ctx, "A", "A_value", 0)
   556  					_, err := pipe.Exec(ctx)
   557  
   558  					Expect(err).To(HaveOccurred())
   559  					Expect(errors.Is(err, context.Canceled)).To(BeTrue())
   560  
   561  					clientNodes, _ := client.Nodes(ctx, "A")
   562  
   563  					for _, node := range clientNodes {
   564  						Expect(node.Failing()).To(BeFalse())
   565  					}
   566  				})
   567  
   568  				It("doesn't fail node with context.DeadlineExceeded error", func() {
   569  					ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
   570  					defer cancel()
   571  
   572  					pipe.Set(ctx, "A", "A_value", 0)
   573  					_, err := pipe.Exec(ctx)
   574  
   575  					Expect(err).To(HaveOccurred())
   576  					Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
   577  
   578  					clientNodes, _ := client.Nodes(ctx, "A")
   579  
   580  					for _, node := range clientNodes {
   581  						Expect(node.Failing()).To(BeFalse())
   582  					}
   583  				})
   584  			})
   585  
   586  			Describe("with TxPipeline", func() {
   587  				BeforeEach(func() {
   588  					pipe = client.TxPipeline().(*redis.Pipeline)
   589  				})
   590  
   591  				AfterEach(func() {})
   592  
   593  				// TxPipeline doesn't support cross slot commands.
   594  				// Use hashtag to force all keys to the same slot.
   595  				keys := []string{"A{s}", "B{s}", "C{s}", "D{s}", "E{s}", "F{s}", "G{s}"}
   596  				assertPipeline(keys)
   597  
   598  				// make sure CrossSlot error is returned
   599  				It("returns CrossSlot error", func() {
   600  					pipe.Set(ctx, "A{s}", "A_value", 0)
   601  					pipe.Set(ctx, "B{t}", "B_value", 0)
   602  					Expect(hashtag.Slot("A{s}")).NotTo(Equal(hashtag.Slot("B{t}")))
   603  					_, err := pipe.Exec(ctx)
   604  					Expect(err).To(MatchError(redis.ErrCrossSlot))
   605  				})
   606  
   607  				It("works normally with keyless commands and no CrossSlot error", func() {
   608  					pipe.Set(ctx, "A{s}", "A_value", 0)
   609  					pipe.Ping(ctx)
   610  					pipe.Set(ctx, "B{s}", "B_value", 0)
   611  					pipe.Ping(ctx)
   612  					_, err := pipe.Exec(ctx)
   613  					Expect(err).To(Not(HaveOccurred()))
   614  				})
   615  
   616  				// doesn't fail when no commands are queued
   617  				It("returns no error when there are no commands", func() {
   618  					_, err := pipe.Exec(ctx)
   619  					Expect(err).NotTo(HaveOccurred())
   620  				})
   621  			})
   622  		})
   623  
   624  		It("supports PubSub", func() {
   625  			pubsub := client.Subscribe(ctx, "mychannel")
   626  			defer pubsub.Close()
   627  
   628  			Eventually(func() error {
   629  				_, err := client.Publish(ctx, "mychannel", "hello").Result()
   630  				if err != nil {
   631  					return err
   632  				}
   633  
   634  				msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
   635  				if err != nil {
   636  					return err
   637  				}
   638  
   639  				_, ok := msg.(*redis.Message)
   640  				if !ok {
   641  					return fmt.Errorf("got %T, wanted *redis.Message", msg)
   642  				}
   643  
   644  				return nil
   645  			}, 30*time.Second).ShouldNot(HaveOccurred())
   646  		})
   647  
   648  		It("supports PubSub with ReadOnly option", func() {
   649  			opt = redisClusterOptions()
   650  			opt.ReadOnly = true
   651  			client = cluster.newClusterClient(ctx, opt)
   652  
   653  			pubsub := client.Subscribe(ctx, "mychannel")
   654  			defer pubsub.Close()
   655  
   656  			Eventually(func() error {
   657  				var masterPubsubChannels atomic.Int64
   658  				var slavePubsubChannels atomic.Int64
   659  
   660  				err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   661  					info := master.InfoMap(ctx, "stats")
   662  					if info.Err() != nil {
   663  						return info.Err()
   664  					}
   665  
   666  					pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
   667  					if err != nil {
   668  						return err
   669  					}
   670  
   671  					masterPubsubChannels.Add(int64(pc))
   672  
   673  					return nil
   674  				})
   675  				if err != nil {
   676  					return err
   677  				}
   678  
   679  				err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
   680  					info := slave.InfoMap(ctx, "stats")
   681  					if info.Err() != nil {
   682  						return info.Err()
   683  					}
   684  
   685  					pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
   686  					if err != nil {
   687  						return err
   688  					}
   689  
   690  					slavePubsubChannels.Add(int64(pc))
   691  
   692  					return nil
   693  				})
   694  				if err != nil {
   695  					return err
   696  				}
   697  
   698  				if c := masterPubsubChannels.Load(); c != int64(0) {
   699  					return fmt.Errorf("total master pubsub_channels is %d; expected 0", c)
   700  				}
   701  
   702  				if c := slavePubsubChannels.Load(); c != int64(1) {
   703  					return fmt.Errorf("total slave pubsub_channels is %d; expected 1", c)
   704  				}
   705  
   706  				return nil
   707  			}, 30*time.Second).ShouldNot(HaveOccurred())
   708  
   709  			Eventually(func() error {
   710  				_, err := client.Publish(ctx, "mychannel", "hello").Result()
   711  				if err != nil {
   712  					return err
   713  				}
   714  
   715  				msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
   716  				if err != nil {
   717  					return err
   718  				}
   719  
   720  				_, ok := msg.(*redis.Message)
   721  				if !ok {
   722  					return fmt.Errorf("got %T, wanted *redis.Message", msg)
   723  				}
   724  
   725  				return nil
   726  			}, 30*time.Second).ShouldNot(HaveOccurred())
   727  		})
   728  
   729  		It("supports sharded PubSub", func() {
   730  			pubsub := client.SSubscribe(ctx, "mychannel")
   731  			defer pubsub.Close()
   732  
   733  			Eventually(func() error {
   734  				_, err := client.SPublish(ctx, "mychannel", "hello").Result()
   735  				if err != nil {
   736  					return err
   737  				}
   738  
   739  				msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
   740  				if err != nil {
   741  					return err
   742  				}
   743  
   744  				_, ok := msg.(*redis.Message)
   745  				if !ok {
   746  					return fmt.Errorf("got %T, wanted *redis.Message", msg)
   747  				}
   748  
   749  				return nil
   750  			}, 30*time.Second).ShouldNot(HaveOccurred())
   751  		})
   752  
   753  		It("supports sharded PubSub with ReadOnly option", func() {
   754  			opt = redisClusterOptions()
   755  			opt.ReadOnly = true
   756  			client = cluster.newClusterClient(ctx, opt)
   757  
   758  			pubsub := client.SSubscribe(ctx, "mychannel")
   759  			defer pubsub.Close()
   760  
   761  			Eventually(func() error {
   762  				var masterPubsubShardChannels atomic.Int64
   763  				var slavePubsubShardChannels atomic.Int64
   764  
   765  				err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   766  					info := master.InfoMap(ctx, "stats")
   767  					if info.Err() != nil {
   768  						return info.Err()
   769  					}
   770  
   771  					pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
   772  					if err != nil {
   773  						return err
   774  					}
   775  
   776  					masterPubsubShardChannels.Add(int64(pc))
   777  
   778  					return nil
   779  				})
   780  				if err != nil {
   781  					return err
   782  				}
   783  
   784  				err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
   785  					info := slave.InfoMap(ctx, "stats")
   786  					if info.Err() != nil {
   787  						return info.Err()
   788  					}
   789  
   790  					pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
   791  					if err != nil {
   792  						return err
   793  					}
   794  
   795  					slavePubsubShardChannels.Add(int64(pc))
   796  
   797  					return nil
   798  				})
   799  				if err != nil {
   800  					return err
   801  				}
   802  
   803  				if c := masterPubsubShardChannels.Load(); c != int64(0) {
   804  					return fmt.Errorf("total master pubsubshard_channels is %d; expected 0", c)
   805  				}
   806  
   807  				if c := slavePubsubShardChannels.Load(); c != int64(1) {
   808  					return fmt.Errorf("total slave pubsubshard_channels is %d; expected 1", c)
   809  				}
   810  
   811  				return nil
   812  			}, 30*time.Second).ShouldNot(HaveOccurred())
   813  
   814  			Eventually(func() error {
   815  				_, err := client.SPublish(ctx, "mychannel", "hello").Result()
   816  				if err != nil {
   817  					return err
   818  				}
   819  
   820  				msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
   821  				if err != nil {
   822  					return err
   823  				}
   824  
   825  				_, ok := msg.(*redis.Message)
   826  				if !ok {
   827  					return fmt.Errorf("got %T, wanted *redis.Message", msg)
   828  				}
   829  
   830  				return nil
   831  			}, 30*time.Second).ShouldNot(HaveOccurred())
   832  		})
   833  
   834  		It("supports PubSub.Ping without channels", func() {
   835  			pubsub := client.Subscribe(ctx)
   836  			defer pubsub.Close()
   837  
   838  			err := pubsub.Ping(ctx)
   839  			Expect(err).NotTo(HaveOccurred())
   840  		})
   841  	}
   842  
   843  	Describe("ClusterClient PROTO 2", func() {
   844  		BeforeEach(func() {
   845  			opt = redisClusterOptions()
   846  			opt.Protocol = 2
   847  			client = cluster.newClusterClient(ctx, opt)
   848  
   849  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   850  				return master.FlushDB(ctx).Err()
   851  			})
   852  			Expect(err).NotTo(HaveOccurred())
   853  		})
   854  
   855  		AfterEach(func() {
   856  			_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   857  				return master.FlushDB(ctx).Err()
   858  			})
   859  			Expect(client.Close()).NotTo(HaveOccurred())
   860  		})
   861  
   862  		It("should CLUSTER PROTO 2", func() {
   863  			_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
   864  				val, err := c.Do(ctx, "HELLO").Result()
   865  				Expect(err).NotTo(HaveOccurred())
   866  				Expect(val).Should(ContainElements("proto", int64(2)))
   867  				return nil
   868  			})
   869  		})
   870  	})
   871  
   872  	Describe("ClusterClient", func() {
   873  		BeforeEach(func() {
   874  			opt = redisClusterOptions()
   875  			opt.ClientName = "cluster_hi"
   876  			client = cluster.newClusterClient(ctx, opt)
   877  
   878  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   879  				return master.FlushDB(ctx).Err()
   880  			})
   881  			Expect(err).NotTo(HaveOccurred())
   882  		})
   883  
   884  		AfterEach(func() {
   885  			_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   886  				return master.FlushDB(ctx).Err()
   887  			})
   888  			Expect(client.Close()).NotTo(HaveOccurred())
   889  		})
   890  
   891  		It("returns pool stats", func() {
   892  			stats := client.PoolStats()
   893  			Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
   894  		})
   895  
   896  		It("returns an error when there are no attempts left", func() {
   897  			opt := redisClusterOptions()
   898  			opt.MaxRedirects = -1
   899  			client := cluster.newClusterClient(ctx, opt)
   900  
   901  			Eventually(func() error {
   902  				return client.SwapNodes(ctx, "A")
   903  			}, 30*time.Second).ShouldNot(HaveOccurred())
   904  
   905  			err := client.Get(ctx, "A").Err()
   906  			Expect(err).To(HaveOccurred())
   907  			Expect(err.Error()).To(ContainSubstring("MOVED"))
   908  
   909  			Expect(client.Close()).NotTo(HaveOccurred())
   910  		})
   911  
   912  		It("determines hash slots correctly for generic commands", func() {
   913  			opt := redisClusterOptions()
   914  			opt.MaxRedirects = -1
   915  			client := cluster.newClusterClient(ctx, opt)
   916  
   917  			err := client.Do(ctx, "GET", "A").Err()
   918  			Expect(err).To(Equal(redis.Nil))
   919  
   920  			err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
   921  			Expect(err).To(Equal(redis.Nil))
   922  
   923  			Eventually(func() error {
   924  				return client.SwapNodes(ctx, "A")
   925  			}, 30*time.Second).ShouldNot(HaveOccurred())
   926  
   927  			err = client.Do(ctx, "GET", "A").Err()
   928  			Expect(err).To(HaveOccurred())
   929  			Expect(err.Error()).To(ContainSubstring("MOVED"))
   930  
   931  			err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
   932  			Expect(err).To(HaveOccurred())
   933  			Expect(err.Error()).To(ContainSubstring("MOVED"))
   934  
   935  			Expect(client.Close()).NotTo(HaveOccurred())
   936  		})
   937  
   938  		It("follows node redirection immediately", func() {
   939  			// Configure retry backoffs far in excess of the expected duration of redirection
   940  			opt := redisClusterOptions()
   941  			opt.MinRetryBackoff = 10 * time.Minute
   942  			opt.MaxRetryBackoff = 20 * time.Minute
   943  			client := cluster.newClusterClient(ctx, opt)
   944  
   945  			Eventually(func() error {
   946  				return client.SwapNodes(ctx, "A")
   947  			}, 30*time.Second).ShouldNot(HaveOccurred())
   948  
   949  			// Note that this context sets a deadline more aggressive than the lowest possible bound
   950  			// of the retry backoff; this verifies that redirection completes immediately.
   951  			redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
   952  			defer cancel()
   953  
   954  			err := client.Set(redirCtx, "A", "VALUE", 0).Err()
   955  			Expect(err).NotTo(HaveOccurred())
   956  
   957  			v, err := client.Get(redirCtx, "A").Result()
   958  			Expect(err).NotTo(HaveOccurred())
   959  			Expect(v).To(Equal("VALUE"))
   960  
   961  			Expect(client.Close()).NotTo(HaveOccurred())
   962  		})
   963  
   964  		It("calls fn for every master node", func() {
   965  			for i := 0; i < 10; i++ {
   966  				Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
   967  			}
   968  
   969  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
   970  				return master.FlushDB(ctx).Err()
   971  			})
   972  			Expect(err).NotTo(HaveOccurred())
   973  
   974  			size, err := client.DBSize(ctx).Result()
   975  			Expect(err).NotTo(HaveOccurred())
   976  			Expect(size).To(Equal(int64(0)))
   977  		})
   978  
   979  		It("should CLUSTER SLOTS", func() {
   980  			res, err := client.ClusterSlots(ctx).Result()
   981  			Expect(err).NotTo(HaveOccurred())
   982  			Expect(res).To(HaveLen(3))
   983  
   984  			wanted := []redis.ClusterSlot{{
   985  				Start: 0,
   986  				End:   5460,
   987  				Nodes: []redis.ClusterNode{{
   988  					ID:   "",
   989  					Addr: "127.0.0.1:16600",
   990  				}, {
   991  					ID:   "",
   992  					Addr: "127.0.0.1:16603",
   993  				}},
   994  			}, {
   995  				Start: 5461,
   996  				End:   10922,
   997  				Nodes: []redis.ClusterNode{{
   998  					ID:   "",
   999  					Addr: "127.0.0.1:16601",
  1000  				}, {
  1001  					ID:   "",
  1002  					Addr: "127.0.0.1:16604",
  1003  				}},
  1004  			}, {
  1005  				Start: 10923,
  1006  				End:   16383,
  1007  				Nodes: []redis.ClusterNode{{
  1008  					ID:   "",
  1009  					Addr: "127.0.0.1:16602",
  1010  				}, {
  1011  					ID:   "",
  1012  					Addr: "127.0.0.1:16605",
  1013  				}},
  1014  			}}
  1015  			Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
  1016  		})
  1017  
  1018  		It("should CLUSTER SHARDS", func() {
  1019  			res, err := client.ClusterShards(ctx).Result()
  1020  			Expect(err).NotTo(HaveOccurred())
  1021  			Expect(res).NotTo(BeEmpty())
  1022  
  1023  			// Iterate over the ClusterShard results and validate the fields.
  1024  			for _, shard := range res {
  1025  				Expect(shard.Slots).NotTo(BeEmpty())
  1026  				for _, slotRange := range shard.Slots {
  1027  					Expect(slotRange.Start).To(BeNumerically(">=", 0))
  1028  					Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start))
  1029  				}
  1030  
  1031  				Expect(shard.Nodes).NotTo(BeEmpty())
  1032  				for _, node := range shard.Nodes {
  1033  					Expect(node.ID).NotTo(BeEmpty())
  1034  					Expect(node.Endpoint).NotTo(BeEmpty())
  1035  					Expect(node.IP).NotTo(BeEmpty())
  1036  					Expect(node.Port).To(BeNumerically(">", 0))
  1037  
  1038  					validRoles := []string{"master", "slave", "replica"}
  1039  					Expect(validRoles).To(ContainElement(node.Role))
  1040  
  1041  					Expect(node.ReplicationOffset).To(BeNumerically(">=", 0))
  1042  
  1043  					validHealthStatuses := []string{"online", "failed", "loading"}
  1044  					Expect(validHealthStatuses).To(ContainElement(node.Health))
  1045  				}
  1046  			}
  1047  		})
  1048  
  1049  		It("should CLUSTER LINKS", func() {
  1050  			res, err := client.ClusterLinks(ctx).Result()
  1051  			Expect(err).NotTo(HaveOccurred())
  1052  			Expect(res).NotTo(BeEmpty())
  1053  
  1054  			// Iterate over the ClusterLink results and validate the map keys.
  1055  			for _, link := range res {
  1056  
  1057  				Expect(link.Direction).NotTo(BeEmpty())
  1058  				Expect([]string{"from", "to"}).To(ContainElement(link.Direction))
  1059  				Expect(link.Node).NotTo(BeEmpty())
  1060  				Expect(link.CreateTime).To(BeNumerically(">", 0))
  1061  
  1062  				Expect(link.Events).NotTo(BeEmpty())
  1063  				validEventChars := []rune{'r', 'w'}
  1064  				for _, eventChar := range link.Events {
  1065  					Expect(validEventChars).To(ContainElement(eventChar))
  1066  				}
  1067  
  1068  				Expect(link.SendBufferAllocated).To(BeNumerically(">=", 0))
  1069  				Expect(link.SendBufferUsed).To(BeNumerically(">=", 0))
  1070  			}
  1071  		})
  1072  
  1073  		It("should cluster client setname", func() {
  1074  			err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
  1075  				return c.Ping(ctx).Err()
  1076  			})
  1077  			Expect(err).NotTo(HaveOccurred())
  1078  
  1079  			_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
  1080  				val, err := c.ClientList(ctx).Result()
  1081  				Expect(err).NotTo(HaveOccurred())
  1082  				Expect(val).Should(ContainSubstring("name=cluster_hi"))
  1083  				return nil
  1084  			})
  1085  		})
  1086  
  1087  		It("should CLUSTER PROTO 3", func() {
  1088  			_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
  1089  				val, err := c.Do(ctx, "HELLO").Result()
  1090  				Expect(err).NotTo(HaveOccurred())
  1091  				Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
  1092  				return nil
  1093  			})
  1094  		})
  1095  
  1096  		It("should CLUSTER MYSHARDID", func() {
  1097  			shardID, err := client.ClusterMyShardID(ctx).Result()
  1098  			Expect(err).NotTo(HaveOccurred())
  1099  			Expect(shardID).ToNot(BeEmpty())
  1100  		})
  1101  
  1102  		It("should CLUSTER NODES", func() {
  1103  			res, err := client.ClusterNodes(ctx).Result()
  1104  			Expect(err).NotTo(HaveOccurred())
  1105  			Expect(len(res)).To(BeNumerically(">", 400))
  1106  		})
  1107  
  1108  		It("should CLUSTER INFO", func() {
  1109  			res, err := client.ClusterInfo(ctx).Result()
  1110  			Expect(err).NotTo(HaveOccurred())
  1111  			Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
  1112  		})
  1113  
  1114  		It("should CLUSTER KEYSLOT", func() {
  1115  			hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
  1116  			Expect(err).NotTo(HaveOccurred())
  1117  			Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
  1118  		})
  1119  
  1120  		It("should CLUSTER GETKEYSINSLOT", func() {
  1121  			keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
  1122  			Expect(err).NotTo(HaveOccurred())
  1123  			Expect(len(keys)).To(Equal(0))
  1124  		})
  1125  
  1126  		It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
  1127  			n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
  1128  			Expect(err).NotTo(HaveOccurred())
  1129  			Expect(n).To(Equal(int64(0)))
  1130  		})
  1131  
  1132  		It("should CLUSTER COUNTKEYSINSLOT", func() {
  1133  			n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
  1134  			Expect(err).NotTo(HaveOccurred())
  1135  			Expect(n).To(Equal(int64(0)))
  1136  		})
  1137  
  1138  		It("should CLUSTER SAVECONFIG", func() {
  1139  			res, err := client.ClusterSaveConfig(ctx).Result()
  1140  			Expect(err).NotTo(HaveOccurred())
  1141  			Expect(res).To(Equal("OK"))
  1142  		})
  1143  
  1144  		It("should CLUSTER SLAVES", func() {
  1145  			nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
  1146  			Expect(err).NotTo(HaveOccurred())
  1147  			Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
  1148  			Expect(nodesList).Should(HaveLen(1))
  1149  		})
  1150  
  1151  		It("should RANDOMKEY", func() {
  1152  			const nkeys = 100
  1153  
  1154  			for i := 0; i < nkeys; i++ {
  1155  				err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
  1156  				Expect(err).NotTo(HaveOccurred())
  1157  			}
  1158  
  1159  			var keys []string
  1160  			addKey := func(key string) {
  1161  				for _, k := range keys {
  1162  					if k == key {
  1163  						return
  1164  					}
  1165  				}
  1166  				keys = append(keys, key)
  1167  			}
  1168  
  1169  			for i := 0; i < nkeys*10; i++ {
  1170  				key := client.RandomKey(ctx).Val()
  1171  				addKey(key)
  1172  			}
  1173  
  1174  			Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
  1175  		})
  1176  
  1177  		It("supports Process hook", func() {
  1178  			testCtx, cancel := context.WithCancel(ctx)
  1179  			defer cancel()
  1180  
  1181  			err := client.Ping(ctx).Err()
  1182  			Expect(err).NotTo(HaveOccurred())
  1183  
  1184  			err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1185  				return node.Ping(ctx).Err()
  1186  			})
  1187  			Expect(err).NotTo(HaveOccurred())
  1188  
  1189  			var stack []string
  1190  
  1191  			clusterHook := &hook{
  1192  				processHook: func(hook redis.ProcessHook) redis.ProcessHook {
  1193  					return func(ctx context.Context, cmd redis.Cmder) error {
  1194  						select {
  1195  						case <-testCtx.Done():
  1196  							return hook(ctx, cmd)
  1197  						default:
  1198  						}
  1199  
  1200  						Expect(cmd.String()).To(Equal("ping: "))
  1201  						stack = append(stack, "cluster.BeforeProcess")
  1202  
  1203  						err := hook(ctx, cmd)
  1204  
  1205  						Expect(cmd.String()).To(Equal("ping: PONG"))
  1206  						stack = append(stack, "cluster.AfterProcess")
  1207  
  1208  						return err
  1209  					}
  1210  				},
  1211  			}
  1212  			client.AddHook(clusterHook)
  1213  
  1214  			nodeHook := &hook{
  1215  				processHook: func(hook redis.ProcessHook) redis.ProcessHook {
  1216  					return func(ctx context.Context, cmd redis.Cmder) error {
  1217  						select {
  1218  						case <-testCtx.Done():
  1219  							return hook(ctx, cmd)
  1220  						default:
  1221  						}
  1222  
  1223  						Expect(cmd.String()).To(Equal("ping: "))
  1224  						stack = append(stack, "shard.BeforeProcess")
  1225  
  1226  						err := hook(ctx, cmd)
  1227  
  1228  						Expect(cmd.String()).To(Equal("ping: PONG"))
  1229  						stack = append(stack, "shard.AfterProcess")
  1230  
  1231  						return err
  1232  					}
  1233  				},
  1234  			}
  1235  
  1236  			_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1237  				node.AddHook(nodeHook)
  1238  				return nil
  1239  			})
  1240  
  1241  			err = client.Ping(ctx).Err()
  1242  			Expect(err).NotTo(HaveOccurred())
  1243  			Expect(stack).To(Equal([]string{
  1244  				"cluster.BeforeProcess",
  1245  				"shard.BeforeProcess",
  1246  				"shard.AfterProcess",
  1247  				"cluster.AfterProcess",
  1248  			}))
  1249  		})
  1250  
  1251  		It("supports Pipeline hook", func() {
  1252  			err := client.Ping(ctx).Err()
  1253  			Expect(err).NotTo(HaveOccurred())
  1254  
  1255  			err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1256  				return node.Ping(ctx).Err()
  1257  			})
  1258  			Expect(err).NotTo(HaveOccurred())
  1259  
  1260  			var stack []string
  1261  
  1262  			client.AddHook(&hook{
  1263  				processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
  1264  					return func(ctx context.Context, cmds []redis.Cmder) error {
  1265  						Expect(cmds).To(HaveLen(1))
  1266  						Expect(cmds[0].String()).To(Equal("ping: "))
  1267  						stack = append(stack, "cluster.BeforeProcessPipeline")
  1268  
  1269  						err := hook(ctx, cmds)
  1270  
  1271  						Expect(cmds).To(HaveLen(1))
  1272  						Expect(cmds[0].String()).To(Equal("ping: PONG"))
  1273  						stack = append(stack, "cluster.AfterProcessPipeline")
  1274  
  1275  						return err
  1276  					}
  1277  				},
  1278  			})
  1279  
  1280  			_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1281  				node.AddHook(&hook{
  1282  					processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
  1283  						return func(ctx context.Context, cmds []redis.Cmder) error {
  1284  							Expect(cmds).To(HaveLen(1))
  1285  							Expect(cmds[0].String()).To(Equal("ping: "))
  1286  							stack = append(stack, "shard.BeforeProcessPipeline")
  1287  
  1288  							err := hook(ctx, cmds)
  1289  
  1290  							Expect(cmds).To(HaveLen(1))
  1291  							Expect(cmds[0].String()).To(Equal("ping: PONG"))
  1292  							stack = append(stack, "shard.AfterProcessPipeline")
  1293  
  1294  							return err
  1295  						}
  1296  					},
  1297  				})
  1298  				return nil
  1299  			})
  1300  
  1301  			_, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  1302  				pipe.Ping(ctx)
  1303  				return nil
  1304  			})
  1305  			Expect(err).NotTo(HaveOccurred())
  1306  			Expect(stack).To(Equal([]string{
  1307  				"cluster.BeforeProcessPipeline",
  1308  				"shard.BeforeProcessPipeline",
  1309  				"shard.AfterProcessPipeline",
  1310  				"cluster.AfterProcessPipeline",
  1311  			}))
  1312  		})
  1313  
  1314  		It("supports TxPipeline hook", func() {
  1315  			err := client.Ping(ctx).Err()
  1316  			Expect(err).NotTo(HaveOccurred())
  1317  
  1318  			err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1319  				return node.Ping(ctx).Err()
  1320  			})
  1321  			Expect(err).NotTo(HaveOccurred())
  1322  
  1323  			var stack []string
  1324  
  1325  			client.AddHook(&hook{
  1326  				processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
  1327  					return func(ctx context.Context, cmds []redis.Cmder) error {
  1328  						Expect(cmds).To(HaveLen(3))
  1329  						Expect(cmds[1].String()).To(Equal("ping: "))
  1330  						stack = append(stack, "cluster.BeforeProcessPipeline")
  1331  
  1332  						err := hook(ctx, cmds)
  1333  
  1334  						Expect(cmds).To(HaveLen(3))
  1335  						Expect(cmds[1].String()).To(Equal("ping: PONG"))
  1336  						stack = append(stack, "cluster.AfterProcessPipeline")
  1337  
  1338  						return err
  1339  					}
  1340  				},
  1341  			})
  1342  
  1343  			_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
  1344  				node.AddHook(&hook{
  1345  					processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
  1346  						return func(ctx context.Context, cmds []redis.Cmder) error {
  1347  							Expect(cmds).To(HaveLen(3))
  1348  							Expect(cmds[1].String()).To(Equal("ping: "))
  1349  							stack = append(stack, "shard.BeforeProcessPipeline")
  1350  
  1351  							err := hook(ctx, cmds)
  1352  
  1353  							Expect(cmds).To(HaveLen(3))
  1354  							Expect(cmds[1].String()).To(Equal("ping: PONG"))
  1355  							stack = append(stack, "shard.AfterProcessPipeline")
  1356  
  1357  							return err
  1358  						}
  1359  					},
  1360  				})
  1361  				return nil
  1362  			})
  1363  
  1364  			_, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  1365  				pipe.Ping(ctx)
  1366  				return nil
  1367  			})
  1368  			Expect(err).NotTo(HaveOccurred())
  1369  			Expect(stack).To(Equal([]string{
  1370  				"cluster.BeforeProcessPipeline",
  1371  				"shard.BeforeProcessPipeline",
  1372  				"shard.AfterProcessPipeline",
  1373  				"cluster.AfterProcessPipeline",
  1374  			}))
  1375  		})
  1376  
  1377  		It("should return correct replica for key", func() {
  1378  			client, err := client.SlaveForKey(ctx, "test")
  1379  			Expect(err).ToNot(HaveOccurred())
  1380  			info := client.Info(ctx, "server")
  1381  			Expect(info.Val()).Should(ContainSubstring("tcp_port:16604"))
  1382  		})
  1383  
  1384  		It("should return correct master for key", func() {
  1385  			client, err := client.MasterForKey(ctx, "test")
  1386  			Expect(err).ToNot(HaveOccurred())
  1387  			info := client.Info(ctx, "server")
  1388  			Expect(info.Val()).Should(ContainSubstring("tcp_port:16601"))
  1389  		})
  1390  
  1391  		assertClusterClient()
  1392  	})
  1393  
  1394  	Describe("ClusterClient with RouteByLatency", func() {
  1395  		BeforeEach(func() {
  1396  			opt = redisClusterOptions()
  1397  			opt.RouteByLatency = true
  1398  			client = cluster.newClusterClient(ctx, opt)
  1399  
  1400  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
  1401  				return master.FlushDB(ctx).Err()
  1402  			})
  1403  			Expect(err).NotTo(HaveOccurred())
  1404  
  1405  			err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
  1406  				Eventually(func() int64 {
  1407  					return client.DBSize(ctx).Val()
  1408  				}, 30*time.Second).Should(Equal(int64(0)))
  1409  				return nil
  1410  			})
  1411  			Expect(err).NotTo(HaveOccurred())
  1412  		})
  1413  
  1414  		AfterEach(func() {
  1415  			err := client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
  1416  				return slave.ReadWrite(ctx).Err()
  1417  			})
  1418  			Expect(err).NotTo(HaveOccurred())
  1419  
  1420  			err = client.Close()
  1421  			Expect(err).NotTo(HaveOccurred())
  1422  		})
  1423  
  1424  		assertClusterClient()
  1425  	})
  1426  
  1427  	Describe("ClusterClient with ClusterSlots", func() {
  1428  		BeforeEach(func() {
  1429  			failover = true
  1430  
  1431  			opt = redisClusterOptions()
  1432  			opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
  1433  				slots := []redis.ClusterSlot{{
  1434  					Start: 0,
  1435  					End:   5460,
  1436  					Nodes: []redis.ClusterNode{{
  1437  						Addr: ":" + ringShard1Port,
  1438  					}},
  1439  				}, {
  1440  					Start: 5461,
  1441  					End:   10922,
  1442  					Nodes: []redis.ClusterNode{{
  1443  						Addr: ":" + ringShard2Port,
  1444  					}},
  1445  				}, {
  1446  					Start: 10923,
  1447  					End:   16383,
  1448  					Nodes: []redis.ClusterNode{{
  1449  						Addr: ":" + ringShard3Port,
  1450  					}},
  1451  				}}
  1452  				return slots, nil
  1453  			}
  1454  			client = cluster.newClusterClient(ctx, opt)
  1455  
  1456  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
  1457  				return master.FlushDB(ctx).Err()
  1458  			})
  1459  			Expect(err).NotTo(HaveOccurred())
  1460  
  1461  			err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
  1462  				Eventually(func() int64 {
  1463  					return client.DBSize(ctx).Val()
  1464  				}, 30*time.Second).Should(Equal(int64(0)))
  1465  				return nil
  1466  			})
  1467  			Expect(err).NotTo(HaveOccurred())
  1468  		})
  1469  
  1470  		AfterEach(func() {
  1471  			failover = false
  1472  
  1473  			err := client.Close()
  1474  			Expect(err).NotTo(HaveOccurred())
  1475  		})
  1476  
  1477  		assertClusterClient()
  1478  	})
  1479  
  1480  	Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
  1481  		BeforeEach(func() {
  1482  			failover = true
  1483  
  1484  			opt = redisClusterOptions()
  1485  			opt.RouteRandomly = true
  1486  			opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
  1487  				slots := []redis.ClusterSlot{{
  1488  					Start: 0,
  1489  					End:   5460,
  1490  					Nodes: []redis.ClusterNode{{
  1491  						Addr: ":" + ringShard1Port,
  1492  					}},
  1493  				}, {
  1494  					Start: 5461,
  1495  					End:   10922,
  1496  					Nodes: []redis.ClusterNode{{
  1497  						Addr: ":" + ringShard2Port,
  1498  					}},
  1499  				}, {
  1500  					Start: 10923,
  1501  					End:   16383,
  1502  					Nodes: []redis.ClusterNode{{
  1503  						Addr: ":" + ringShard3Port,
  1504  					}},
  1505  				}}
  1506  				return slots, nil
  1507  			}
  1508  			client = cluster.newClusterClient(ctx, opt)
  1509  
  1510  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
  1511  				return master.FlushDB(ctx).Err()
  1512  			})
  1513  			Expect(err).NotTo(HaveOccurred())
  1514  
  1515  			err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
  1516  				Eventually(func() int64 {
  1517  					return client.DBSize(ctx).Val()
  1518  				}, 30*time.Second).Should(Equal(int64(0)))
  1519  				return nil
  1520  			})
  1521  			Expect(err).NotTo(HaveOccurred())
  1522  		})
  1523  
  1524  		AfterEach(func() {
  1525  			failover = false
  1526  
  1527  			err := client.Close()
  1528  			Expect(err).NotTo(HaveOccurred())
  1529  		})
  1530  
  1531  		assertClusterClient()
  1532  	})
  1533  
  1534  	Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
  1535  		BeforeEach(func() {
  1536  			failover = true
  1537  
  1538  			opt = redisClusterOptions()
  1539  			opt.ReadOnly = true
  1540  			opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
  1541  				slots := []redis.ClusterSlot{{
  1542  					Start: 0,
  1543  					End:   5460,
  1544  					Nodes: []redis.ClusterNode{{
  1545  						Addr: ":16600",
  1546  					}, {
  1547  						Addr: ":16603",
  1548  					}},
  1549  				}, {
  1550  					Start: 5461,
  1551  					End:   10922,
  1552  					Nodes: []redis.ClusterNode{{
  1553  						Addr: ":16601",
  1554  					}, {
  1555  						Addr: ":16604",
  1556  					}},
  1557  				}, {
  1558  					Start: 10923,
  1559  					End:   16383,
  1560  					Nodes: []redis.ClusterNode{{
  1561  						Addr: ":16602",
  1562  					}, {
  1563  						Addr: ":16605",
  1564  					}},
  1565  				}}
  1566  				return slots, nil
  1567  			}
  1568  			client = cluster.newClusterClient(ctx, opt)
  1569  
  1570  			err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
  1571  				return master.FlushDB(ctx).Err()
  1572  			})
  1573  			Expect(err).NotTo(HaveOccurred())
  1574  
  1575  			err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
  1576  				Eventually(func() int64 {
  1577  					return client.DBSize(ctx).Val()
  1578  				}, 30*time.Second).Should(Equal(int64(0)))
  1579  				return nil
  1580  			})
  1581  			Expect(err).NotTo(HaveOccurred())
  1582  		})
  1583  
  1584  		AfterEach(func() {
  1585  			failover = false
  1586  
  1587  			err := client.Close()
  1588  			Expect(err).NotTo(HaveOccurred())
  1589  		})
  1590  
  1591  		assertClusterClient()
  1592  	})
  1593  })
  1594  
  1595  var _ = Describe("ClusterClient without nodes", func() {
  1596  	var client *redis.ClusterClient
  1597  
  1598  	BeforeEach(func() {
  1599  		client = redis.NewClusterClient(&redis.ClusterOptions{})
  1600  	})
  1601  
  1602  	AfterEach(func() {
  1603  		Expect(client.Close()).NotTo(HaveOccurred())
  1604  	})
  1605  
  1606  	It("Ping returns an error", func() {
  1607  		err := client.Ping(ctx).Err()
  1608  		Expect(err).To(MatchError("redis: cluster has no nodes"))
  1609  	})
  1610  
  1611  	It("pipeline returns an error", func() {
  1612  		_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  1613  			pipe.Ping(ctx)
  1614  			return nil
  1615  		})
  1616  		Expect(err).To(MatchError("redis: cluster has no nodes"))
  1617  	})
  1618  })
  1619  
  1620  var _ = Describe("ClusterClient without valid nodes", func() {
  1621  	var client *redis.ClusterClient
  1622  
  1623  	BeforeEach(func() {
  1624  		client = redis.NewClusterClient(&redis.ClusterOptions{
  1625  			Addrs: []string{redisAddr},
  1626  		})
  1627  	})
  1628  
  1629  	AfterEach(func() {
  1630  		Expect(client.Close()).NotTo(HaveOccurred())
  1631  	})
  1632  
  1633  	It("returns an error", func() {
  1634  		err := client.Ping(ctx).Err()
  1635  		Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  1636  	})
  1637  
  1638  	It("pipeline returns an error", func() {
  1639  		_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  1640  			pipe.Ping(ctx)
  1641  			return nil
  1642  		})
  1643  		Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  1644  	})
  1645  })
  1646  
  1647  var _ = Describe("ClusterClient with unavailable Cluster", func() {
  1648  	var client *redis.ClusterClient
  1649  
  1650  	BeforeEach(func() {
  1651  		opt := redisClusterOptions()
  1652  		opt.ReadTimeout = 250 * time.Millisecond
  1653  		opt.WriteTimeout = 250 * time.Millisecond
  1654  		opt.MaxRedirects = 1
  1655  		client = cluster.newClusterClientUnstable(opt)
  1656  		Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
  1657  
  1658  		for _, node := range cluster.clients {
  1659  			err := node.ClientPause(ctx, 5*time.Second).Err()
  1660  			Expect(err).NotTo(HaveOccurred())
  1661  		}
  1662  	})
  1663  
  1664  	AfterEach(func() {
  1665  		Expect(client.Close()).NotTo(HaveOccurred())
  1666  	})
  1667  
  1668  	It("recovers when Cluster recovers", func() {
  1669  		err := client.Ping(ctx).Err()
  1670  		Expect(err).To(HaveOccurred())
  1671  
  1672  		Eventually(func() error {
  1673  			return client.Ping(ctx).Err()
  1674  		}, "30s").ShouldNot(HaveOccurred())
  1675  	})
  1676  })
  1677  
  1678  var _ = Describe("ClusterClient timeout", func() {
  1679  	var client *redis.ClusterClient
  1680  
  1681  	AfterEach(func() {
  1682  		_ = client.Close()
  1683  	})
  1684  
  1685  	testTimeout := func() {
  1686  		It("Ping timeouts", func() {
  1687  			err := client.Ping(ctx).Err()
  1688  			Expect(err).To(HaveOccurred())
  1689  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1690  		})
  1691  
  1692  		It("Pipeline timeouts", func() {
  1693  			_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
  1694  				pipe.Ping(ctx)
  1695  				return nil
  1696  			})
  1697  			Expect(err).To(HaveOccurred())
  1698  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1699  		})
  1700  
  1701  		It("Tx timeouts", func() {
  1702  			err := client.Watch(ctx, func(tx *redis.Tx) error {
  1703  				return tx.Ping(ctx).Err()
  1704  			}, "foo")
  1705  			Expect(err).To(HaveOccurred())
  1706  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1707  		})
  1708  
  1709  		It("Tx Pipeline timeouts", func() {
  1710  			err := client.Watch(ctx, func(tx *redis.Tx) error {
  1711  				_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
  1712  					pipe.Ping(ctx)
  1713  					return nil
  1714  				})
  1715  				return err
  1716  			}, "foo")
  1717  			Expect(err).To(HaveOccurred())
  1718  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1719  		})
  1720  	}
  1721  
  1722  	const pause = 5 * time.Second
  1723  
  1724  	Context("read/write timeout", func() {
  1725  		BeforeEach(func() {
  1726  			opt := redisClusterOptions()
  1727  			client = cluster.newClusterClient(ctx, opt)
  1728  
  1729  			err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
  1730  				err := client.ClientPause(ctx, pause).Err()
  1731  
  1732  				opt := client.Options()
  1733  				opt.ReadTimeout = time.Nanosecond
  1734  				opt.WriteTimeout = time.Nanosecond
  1735  
  1736  				return err
  1737  			})
  1738  			Expect(err).NotTo(HaveOccurred())
  1739  
  1740  			// Overwrite timeouts after the client is initialized.
  1741  			opt.ReadTimeout = time.Nanosecond
  1742  			opt.WriteTimeout = time.Nanosecond
  1743  			opt.MaxRedirects = 0
  1744  		})
  1745  
  1746  		AfterEach(func() {
  1747  			_ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
  1748  				defer GinkgoRecover()
  1749  
  1750  				opt := client.Options()
  1751  				opt.ReadTimeout = time.Second
  1752  				opt.WriteTimeout = time.Second
  1753  
  1754  				Eventually(func() error {
  1755  					return client.Ping(ctx).Err()
  1756  				}, 2*pause).ShouldNot(HaveOccurred())
  1757  				return nil
  1758  			})
  1759  
  1760  			err := client.Close()
  1761  			Expect(err).NotTo(HaveOccurred())
  1762  		})
  1763  
  1764  		testTimeout()
  1765  	})
  1766  })
  1767  
  1768  var _ = Describe("ClusterClient ParseURL", func() {
  1769  	cases := []struct {
  1770  		test string
  1771  		url  string
  1772  		o    *redis.ClusterOptions // expected value
  1773  		err  error
  1774  	}{
  1775  		{
  1776  			test: "ParseRedisURL",
  1777  			url:  "redis://localhost:123",
  1778  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
  1779  		}, {
  1780  			test: "ParseRedissURL",
  1781  			url:  "rediss://localhost:123",
  1782  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
  1783  		}, {
  1784  			test: "MissingRedisPort",
  1785  			url:  "redis://localhost",
  1786  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
  1787  		}, {
  1788  			test: "MissingRedissPort",
  1789  			url:  "rediss://localhost",
  1790  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
  1791  		}, {
  1792  			test: "MultipleRedisURLs",
  1793  			url:  "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
  1794  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
  1795  		}, {
  1796  			test: "MultipleRedissURLs",
  1797  			url:  "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
  1798  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
  1799  		}, {
  1800  			test: "OnlyPassword",
  1801  			url:  "redis://:bar@localhost:123",
  1802  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
  1803  		}, {
  1804  			test: "OnlyUser",
  1805  			url:  "redis://foo@localhost:123",
  1806  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
  1807  		}, {
  1808  			test: "RedisUsernamePassword",
  1809  			url:  "redis://foo:bar@localhost:123",
  1810  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
  1811  		}, {
  1812  			test: "RedissUsernamePassword",
  1813  			url:  "rediss://foo:bar@localhost:123?addr=localhost:1234",
  1814  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
  1815  		}, {
  1816  			test: "QueryParameters",
  1817  			url:  "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
  1818  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
  1819  		}, {
  1820  			test: "DisabledTimeout",
  1821  			url:  "redis://localhost:123?conn_max_idle_time=0",
  1822  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
  1823  		}, {
  1824  			test: "DisabledTimeoutNeg",
  1825  			url:  "redis://localhost:123?conn_max_idle_time=-1",
  1826  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
  1827  		}, {
  1828  			test: "UseDefault",
  1829  			url:  "redis://localhost:123?conn_max_idle_time=",
  1830  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
  1831  		}, {
  1832  			test: "FailingTimeoutSeconds",
  1833  			url:  "redis://localhost:123?failing_timeout_seconds=25",
  1834  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, FailingTimeoutSeconds: 25},
  1835  		}, {
  1836  			test: "Protocol",
  1837  			url:  "redis://localhost:123?protocol=2",
  1838  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Protocol: 2},
  1839  		}, {
  1840  			test: "ClientName",
  1841  			url:  "redis://localhost:123?client_name=cluster_hi",
  1842  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
  1843  		}, {
  1844  			test: "UseDefaultMissing=",
  1845  			url:  "redis://localhost:123?conn_max_idle_time",
  1846  			o:    &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
  1847  		}, {
  1848  			test: "InvalidQueryAddr",
  1849  			url:  "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
  1850  			err:  errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
  1851  		}, {
  1852  			test: "InvalidInt",
  1853  			url:  "redis://localhost?pool_size=five",
  1854  			err:  errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
  1855  		}, {
  1856  			test: "InvalidBool",
  1857  			url:  "redis://localhost?pool_fifo=yes",
  1858  			err:  errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
  1859  		}, {
  1860  			test: "UnknownParam",
  1861  			url:  "redis://localhost?abc=123",
  1862  			err:  errors.New("redis: unexpected option: abc"),
  1863  		}, {
  1864  			test: "InvalidScheme",
  1865  			url:  "https://google.com",
  1866  			err:  errors.New("redis: invalid URL scheme: https"),
  1867  		},
  1868  	}
  1869  
  1870  	It("match ParseClusterURL", func() {
  1871  		for i := range cases {
  1872  			tc := cases[i]
  1873  			actual, err := redis.ParseClusterURL(tc.url)
  1874  			if tc.err != nil {
  1875  				Expect(err).Should(MatchError(tc.err))
  1876  			} else {
  1877  				Expect(err).NotTo(HaveOccurred())
  1878  			}
  1879  
  1880  			if err == nil {
  1881  				Expect(tc.o).NotTo(BeNil())
  1882  
  1883  				Expect(tc.o.Addrs).To(Equal(actual.Addrs))
  1884  				Expect(tc.o.TLSConfig).To(Equal(actual.TLSConfig))
  1885  				Expect(tc.o.Username).To(Equal(actual.Username))
  1886  				Expect(tc.o.Password).To(Equal(actual.Password))
  1887  				Expect(tc.o.MaxRetries).To(Equal(actual.MaxRetries))
  1888  				Expect(tc.o.MinRetryBackoff).To(Equal(actual.MinRetryBackoff))
  1889  				Expect(tc.o.MaxRetryBackoff).To(Equal(actual.MaxRetryBackoff))
  1890  				Expect(tc.o.DialTimeout).To(Equal(actual.DialTimeout))
  1891  				Expect(tc.o.ReadTimeout).To(Equal(actual.ReadTimeout))
  1892  				Expect(tc.o.WriteTimeout).To(Equal(actual.WriteTimeout))
  1893  				Expect(tc.o.PoolFIFO).To(Equal(actual.PoolFIFO))
  1894  				Expect(tc.o.PoolSize).To(Equal(actual.PoolSize))
  1895  				Expect(tc.o.MinIdleConns).To(Equal(actual.MinIdleConns))
  1896  				Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
  1897  				Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
  1898  				Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
  1899  				Expect(tc.o.FailingTimeoutSeconds).To(Equal(actual.FailingTimeoutSeconds))
  1900  			}
  1901  		}
  1902  	})
  1903  })
  1904  
  1905  var _ = Describe("ClusterClient FailingTimeoutSeconds", func() {
  1906  	var client *redis.ClusterClient
  1907  
  1908  	AfterEach(func() {
  1909  		if client != nil {
  1910  			_ = client.Close()
  1911  		}
  1912  	})
  1913  
  1914  	It("should use default failing timeout of 15 seconds", func() {
  1915  		opt := redisClusterOptions()
  1916  		client = cluster.newClusterClient(ctx, opt)
  1917  
  1918  		// Default should be 15 seconds
  1919  		Expect(opt.FailingTimeoutSeconds).To(Equal(15))
  1920  	})
  1921  
  1922  	It("should use custom failing timeout", func() {
  1923  		opt := redisClusterOptions()
  1924  		opt.FailingTimeoutSeconds = 30
  1925  		client = cluster.newClusterClient(ctx, opt)
  1926  
  1927  		// Should use custom value
  1928  		Expect(opt.FailingTimeoutSeconds).To(Equal(30))
  1929  	})
  1930  
  1931  	It("should parse failing_timeout_seconds from URL", func() {
  1932  		url := "redis://localhost:16600?failing_timeout_seconds=25"
  1933  		opt, err := redis.ParseClusterURL(url)
  1934  		Expect(err).NotTo(HaveOccurred())
  1935  		Expect(opt.FailingTimeoutSeconds).To(Equal(25))
  1936  	})
  1937  
  1938  	It("should handle node failing timeout correctly", func() {
  1939  		opt := redisClusterOptions()
  1940  		opt.FailingTimeoutSeconds = 2 // Short timeout for testing
  1941  		client = cluster.newClusterClient(ctx, opt)
  1942  
  1943  		// Get a node and mark it as failing
  1944  		nodes, err := client.Nodes(ctx, "A")
  1945  		Expect(err).NotTo(HaveOccurred())
  1946  		Expect(len(nodes)).To(BeNumerically(">", 0))
  1947  
  1948  		node := nodes[0]
  1949  
  1950  		// Initially not failing
  1951  		Expect(node.Failing()).To(BeFalse())
  1952  
  1953  		// Mark as failing
  1954  		node.MarkAsFailing()
  1955  		Expect(node.Failing()).To(BeTrue())
  1956  
  1957  		// Should still be failing after 1 second (less than timeout)
  1958  		time.Sleep(1 * time.Second)
  1959  		Expect(node.Failing()).To(BeTrue())
  1960  
  1961  		// Should not be failing after timeout expires
  1962  		time.Sleep(2 * time.Second) // Total 3 seconds > 2 second timeout
  1963  		Expect(node.Failing()).To(BeFalse())
  1964  	})
  1965  
  1966  	It("should handle zero timeout by using default", func() {
  1967  		opt := redisClusterOptions()
  1968  		opt.FailingTimeoutSeconds = 0 // Should use default
  1969  		client = cluster.newClusterClient(ctx, opt)
  1970  
  1971  		// After initialization, should be set to default
  1972  		Expect(opt.FailingTimeoutSeconds).To(Equal(15))
  1973  	})
  1974  })
  1975  

View as plain text