...

Source file src/github.com/redis/go-redis/v9/osscluster.go

Documentation: github.com/redis/go-redis/v9

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"fmt"
     7  	"math"
     8  	"net"
     9  	"net/url"
    10  	"runtime"
    11  	"sort"
    12  	"strings"
    13  	"sync"
    14  	"sync/atomic"
    15  	"time"
    16  
    17  	"github.com/redis/go-redis/v9/auth"
    18  	"github.com/redis/go-redis/v9/internal"
    19  	"github.com/redis/go-redis/v9/internal/hashtag"
    20  	"github.com/redis/go-redis/v9/internal/pool"
    21  	"github.com/redis/go-redis/v9/internal/proto"
    22  	"github.com/redis/go-redis/v9/internal/rand"
    23  )
    24  
    25  const (
    26  	minLatencyMeasurementInterval = 10 * time.Second
    27  )
    28  
    29  var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
    30  
    31  // ClusterOptions are used to configure a cluster client and should be
    32  // passed to NewClusterClient.
    33  type ClusterOptions struct {
    34  	// A seed list of host:port addresses of cluster nodes.
    35  	Addrs []string
    36  
    37  	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
    38  	ClientName string
    39  
    40  	// NewClient creates a cluster node client with provided name and options.
    41  	NewClient func(opt *Options) *Client
    42  
    43  	// The maximum number of retries before giving up. Command is retried
    44  	// on network errors and MOVED/ASK redirects.
    45  	// Default is 3 retries.
    46  	MaxRedirects int
    47  
    48  	// Enables read-only commands on slave nodes.
    49  	ReadOnly bool
    50  	// Allows routing read-only commands to the closest master or slave node.
    51  	// It automatically enables ReadOnly.
    52  	RouteByLatency bool
    53  	// Allows routing read-only commands to the random master or slave node.
    54  	// It automatically enables ReadOnly.
    55  	RouteRandomly bool
    56  
    57  	// Optional function that returns cluster slots information.
    58  	// It is useful to manually create cluster of standalone Redis servers
    59  	// and load-balance read/write operations between master and slaves.
    60  	// It can use service like ZooKeeper to maintain configuration information
    61  	// and Cluster.ReloadState to manually trigger state reloading.
    62  	ClusterSlots func(context.Context) ([]ClusterSlot, error)
    63  
    64  	// Following options are copied from Options struct.
    65  
    66  	Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
    67  
    68  	OnConnect func(ctx context.Context, cn *Conn) error
    69  
    70  	Protocol                     int
    71  	Username                     string
    72  	Password                     string
    73  	CredentialsProvider          func() (username string, password string)
    74  	CredentialsProviderContext   func(ctx context.Context) (username string, password string, err error)
    75  	StreamingCredentialsProvider auth.StreamingCredentialsProvider
    76  
    77  	MaxRetries      int
    78  	MinRetryBackoff time.Duration
    79  	MaxRetryBackoff time.Duration
    80  
    81  	DialTimeout           time.Duration
    82  	ReadTimeout           time.Duration
    83  	WriteTimeout          time.Duration
    84  	ContextTimeoutEnabled bool
    85  
    86  	PoolFIFO        bool
    87  	PoolSize        int // applies per cluster node and not for the whole cluster
    88  	PoolTimeout     time.Duration
    89  	MinIdleConns    int
    90  	MaxIdleConns    int
    91  	MaxActiveConns  int // applies per cluster node and not for the whole cluster
    92  	ConnMaxIdleTime time.Duration
    93  	ConnMaxLifetime time.Duration
    94  
    95  	// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
    96  	// Larger buffers can improve performance for commands that return large responses.
    97  	// Smaller buffers can improve memory usage for larger pools.
    98  	//
    99  	// default: 32KiB (32768 bytes)
   100  	ReadBufferSize int
   101  
   102  	// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
   103  	// Larger buffers can improve performance for large pipelines and commands with many arguments.
   104  	// Smaller buffers can improve memory usage for larger pools.
   105  	//
   106  	// default: 32KiB (32768 bytes)
   107  	WriteBufferSize int
   108  
   109  	TLSConfig *tls.Config
   110  
   111  	// DisableIndentity - Disable set-lib on connect.
   112  	//
   113  	// default: false
   114  	//
   115  	// Deprecated: Use DisableIdentity instead.
   116  	DisableIndentity bool
   117  
   118  	// DisableIdentity is used to disable CLIENT SETINFO command on connect.
   119  	//
   120  	// default: false
   121  	DisableIdentity bool
   122  
   123  	IdentitySuffix string // Add suffix to client name. Default is empty.
   124  
   125  	// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
   126  	UnstableResp3 bool
   127  
   128  	// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
   129  	// When a node is marked as failing, it will be avoided for this duration.
   130  	// Default is 15 seconds.
   131  	FailingTimeoutSeconds int
   132  }
   133  
   134  func (opt *ClusterOptions) init() {
   135  	switch opt.MaxRedirects {
   136  	case -1:
   137  		opt.MaxRedirects = 0
   138  	case 0:
   139  		opt.MaxRedirects = 3
   140  	}
   141  
   142  	if opt.RouteByLatency || opt.RouteRandomly {
   143  		opt.ReadOnly = true
   144  	}
   145  
   146  	if opt.PoolSize == 0 {
   147  		opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
   148  	}
   149  	if opt.ReadBufferSize == 0 {
   150  		opt.ReadBufferSize = proto.DefaultBufferSize
   151  	}
   152  	if opt.WriteBufferSize == 0 {
   153  		opt.WriteBufferSize = proto.DefaultBufferSize
   154  	}
   155  
   156  	switch opt.ReadTimeout {
   157  	case -1:
   158  		opt.ReadTimeout = 0
   159  	case 0:
   160  		opt.ReadTimeout = 3 * time.Second
   161  	}
   162  	switch opt.WriteTimeout {
   163  	case -1:
   164  		opt.WriteTimeout = 0
   165  	case 0:
   166  		opt.WriteTimeout = opt.ReadTimeout
   167  	}
   168  
   169  	if opt.MaxRetries == 0 {
   170  		opt.MaxRetries = -1
   171  	}
   172  	switch opt.MinRetryBackoff {
   173  	case -1:
   174  		opt.MinRetryBackoff = 0
   175  	case 0:
   176  		opt.MinRetryBackoff = 8 * time.Millisecond
   177  	}
   178  	switch opt.MaxRetryBackoff {
   179  	case -1:
   180  		opt.MaxRetryBackoff = 0
   181  	case 0:
   182  		opt.MaxRetryBackoff = 512 * time.Millisecond
   183  	}
   184  
   185  	if opt.NewClient == nil {
   186  		opt.NewClient = NewClient
   187  	}
   188  
   189  	if opt.FailingTimeoutSeconds == 0 {
   190  		opt.FailingTimeoutSeconds = 15
   191  	}
   192  }
   193  
   194  // ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
   195  // The URL must be in the form:
   196  //
   197  //	redis://<user>:<password>@<host>:<port>
   198  //	or
   199  //	rediss://<user>:<password>@<host>:<port>
   200  //
   201  // To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
   202  //
   203  //	redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
   204  //	or
   205  //	rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
   206  //
   207  // Most Option fields can be set using query parameters, with the following restrictions:
   208  //   - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
   209  //   - only scalar type fields are supported (bool, int, time.Duration)
   210  //   - for time.Duration fields, values must be a valid input for time.ParseDuration();
   211  //     additionally a plain integer as value (i.e. without unit) is interpreted as seconds
   212  //   - to disable a duration field, use value less than or equal to 0; to use the default
   213  //     value, leave the value blank or remove the parameter
   214  //   - only the last value is interpreted if a parameter is given multiple times
   215  //   - fields "network", "addr", "username" and "password" can only be set using other
   216  //     URL attributes (scheme, host, userinfo, resp.), query parameters using these
   217  //     names will be treated as unknown parameters
   218  //   - unknown parameter names will result in an error
   219  //
   220  // Example:
   221  //
   222  //	redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
   223  //	is equivalent to:
   224  //	&ClusterOptions{
   225  //		Addr:        ["localhost:6789", "localhost:6790", "localhost:6791"]
   226  //		DialTimeout: 3 * time.Second, // no time unit = seconds
   227  //		ReadTimeout: 6 * time.Second,
   228  //	}
   229  func ParseClusterURL(redisURL string) (*ClusterOptions, error) {
   230  	o := &ClusterOptions{}
   231  
   232  	u, err := url.Parse(redisURL)
   233  	if err != nil {
   234  		return nil, err
   235  	}
   236  
   237  	// add base URL to the array of addresses
   238  	// more addresses may be added through the URL params
   239  	h, p := getHostPortWithDefaults(u)
   240  	o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
   241  
   242  	// setup username, password, and other configurations
   243  	o, err = setupClusterConn(u, h, o)
   244  	if err != nil {
   245  		return nil, err
   246  	}
   247  
   248  	return o, nil
   249  }
   250  
   251  // setupClusterConn gets the username and password from the URL and the query parameters.
   252  func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptions, error) {
   253  	switch u.Scheme {
   254  	case "rediss":
   255  		o.TLSConfig = &tls.Config{ServerName: host}
   256  		fallthrough
   257  	case "redis":
   258  		o.Username, o.Password = getUserPassword(u)
   259  	default:
   260  		return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
   261  	}
   262  
   263  	// retrieve the configuration from the query parameters
   264  	o, err := setupClusterQueryParams(u, o)
   265  	if err != nil {
   266  		return nil, err
   267  	}
   268  
   269  	return o, nil
   270  }
   271  
   272  // setupClusterQueryParams converts query parameters in u to option value in o.
   273  func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
   274  	q := queryOptions{q: u.Query()}
   275  
   276  	o.Protocol = q.int("protocol")
   277  	o.ClientName = q.string("client_name")
   278  	o.MaxRedirects = q.int("max_redirects")
   279  	o.ReadOnly = q.bool("read_only")
   280  	o.RouteByLatency = q.bool("route_by_latency")
   281  	o.RouteRandomly = q.bool("route_randomly")
   282  	o.MaxRetries = q.int("max_retries")
   283  	o.MinRetryBackoff = q.duration("min_retry_backoff")
   284  	o.MaxRetryBackoff = q.duration("max_retry_backoff")
   285  	o.DialTimeout = q.duration("dial_timeout")
   286  	o.ReadTimeout = q.duration("read_timeout")
   287  	o.WriteTimeout = q.duration("write_timeout")
   288  	o.PoolFIFO = q.bool("pool_fifo")
   289  	o.PoolSize = q.int("pool_size")
   290  	o.MinIdleConns = q.int("min_idle_conns")
   291  	o.MaxIdleConns = q.int("max_idle_conns")
   292  	o.MaxActiveConns = q.int("max_active_conns")
   293  	o.PoolTimeout = q.duration("pool_timeout")
   294  	o.ConnMaxLifetime = q.duration("conn_max_lifetime")
   295  	o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
   296  	o.FailingTimeoutSeconds = q.int("failing_timeout_seconds")
   297  
   298  	if q.err != nil {
   299  		return nil, q.err
   300  	}
   301  
   302  	// addr can be specified as many times as needed
   303  	addrs := q.strings("addr")
   304  	for _, addr := range addrs {
   305  		h, p, err := net.SplitHostPort(addr)
   306  		if err != nil || h == "" || p == "" {
   307  			return nil, fmt.Errorf("redis: unable to parse addr param: %s", addr)
   308  		}
   309  
   310  		o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
   311  	}
   312  
   313  	// any parameters left?
   314  	if r := q.remaining(); len(r) > 0 {
   315  		return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
   316  	}
   317  
   318  	return o, nil
   319  }
   320  
   321  func (opt *ClusterOptions) clientOptions() *Options {
   322  	return &Options{
   323  		ClientName: opt.ClientName,
   324  		Dialer:     opt.Dialer,
   325  		OnConnect:  opt.OnConnect,
   326  
   327  		Protocol:                     opt.Protocol,
   328  		Username:                     opt.Username,
   329  		Password:                     opt.Password,
   330  		CredentialsProvider:          opt.CredentialsProvider,
   331  		CredentialsProviderContext:   opt.CredentialsProviderContext,
   332  		StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
   333  
   334  		MaxRetries:      opt.MaxRetries,
   335  		MinRetryBackoff: opt.MinRetryBackoff,
   336  		MaxRetryBackoff: opt.MaxRetryBackoff,
   337  
   338  		DialTimeout:           opt.DialTimeout,
   339  		ReadTimeout:           opt.ReadTimeout,
   340  		WriteTimeout:          opt.WriteTimeout,
   341  		ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
   342  
   343  		PoolFIFO:              opt.PoolFIFO,
   344  		PoolSize:              opt.PoolSize,
   345  		PoolTimeout:           opt.PoolTimeout,
   346  		MinIdleConns:          opt.MinIdleConns,
   347  		MaxIdleConns:          opt.MaxIdleConns,
   348  		MaxActiveConns:        opt.MaxActiveConns,
   349  		ConnMaxIdleTime:       opt.ConnMaxIdleTime,
   350  		ConnMaxLifetime:       opt.ConnMaxLifetime,
   351  		ReadBufferSize:        opt.ReadBufferSize,
   352  		WriteBufferSize:       opt.WriteBufferSize,
   353  		DisableIdentity:       opt.DisableIdentity,
   354  		DisableIndentity:      opt.DisableIdentity,
   355  		IdentitySuffix:        opt.IdentitySuffix,
   356  		FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
   357  		TLSConfig:             opt.TLSConfig,
   358  		// If ClusterSlots is populated, then we probably have an artificial
   359  		// cluster whose nodes are not in clustering mode (otherwise there isn't
   360  		// much use for ClusterSlots config).  This means we cannot execute the
   361  		// READONLY command against that node -- setting readOnly to false in such
   362  		// situations in the options below will prevent that from happening.
   363  		readOnly:      opt.ReadOnly && opt.ClusterSlots == nil,
   364  		UnstableResp3: opt.UnstableResp3,
   365  	}
   366  }
   367  
   368  //------------------------------------------------------------------------------
   369  
   370  type clusterNode struct {
   371  	Client *Client
   372  
   373  	latency    uint32 // atomic
   374  	generation uint32 // atomic
   375  	failing    uint32 // atomic
   376  	loaded     uint32 // atomic
   377  
   378  	// last time the latency measurement was performed for the node, stored in nanoseconds from epoch
   379  	lastLatencyMeasurement int64 // atomic
   380  }
   381  
   382  func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
   383  	opt := clOpt.clientOptions()
   384  	opt.Addr = addr
   385  	node := clusterNode{
   386  		Client: clOpt.NewClient(opt),
   387  	}
   388  
   389  	node.latency = math.MaxUint32
   390  	if clOpt.RouteByLatency {
   391  		go node.updateLatency()
   392  	}
   393  
   394  	return &node
   395  }
   396  
   397  func (n *clusterNode) String() string {
   398  	return n.Client.String()
   399  }
   400  
   401  func (n *clusterNode) Close() error {
   402  	return n.Client.Close()
   403  }
   404  
   405  const maximumNodeLatency = 1 * time.Minute
   406  
   407  func (n *clusterNode) updateLatency() {
   408  	const numProbe = 10
   409  	var dur uint64
   410  
   411  	successes := 0
   412  	for i := 0; i < numProbe; i++ {
   413  		time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
   414  
   415  		start := time.Now()
   416  		err := n.Client.Ping(context.TODO()).Err()
   417  		if err == nil {
   418  			dur += uint64(time.Since(start) / time.Microsecond)
   419  			successes++
   420  		}
   421  	}
   422  
   423  	var latency float64
   424  	if successes == 0 {
   425  		// If none of the pings worked, set latency to some arbitrarily high value so this node gets
   426  		// least priority.
   427  		latency = float64((maximumNodeLatency) / time.Microsecond)
   428  	} else {
   429  		latency = float64(dur) / float64(successes)
   430  	}
   431  	atomic.StoreUint32(&n.latency, uint32(latency+0.5))
   432  	n.SetLastLatencyMeasurement(time.Now())
   433  }
   434  
   435  func (n *clusterNode) Latency() time.Duration {
   436  	latency := atomic.LoadUint32(&n.latency)
   437  	return time.Duration(latency) * time.Microsecond
   438  }
   439  
   440  func (n *clusterNode) MarkAsFailing() {
   441  	atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
   442  	atomic.StoreUint32(&n.loaded, 0)
   443  }
   444  
   445  func (n *clusterNode) Failing() bool {
   446  	timeout := int64(n.Client.opt.FailingTimeoutSeconds)
   447  
   448  	failing := atomic.LoadUint32(&n.failing)
   449  	if failing == 0 {
   450  		return false
   451  	}
   452  	if time.Now().Unix()-int64(failing) < timeout {
   453  		return true
   454  	}
   455  	atomic.StoreUint32(&n.failing, 0)
   456  	return false
   457  }
   458  
   459  func (n *clusterNode) Generation() uint32 {
   460  	return atomic.LoadUint32(&n.generation)
   461  }
   462  
   463  func (n *clusterNode) LastLatencyMeasurement() int64 {
   464  	return atomic.LoadInt64(&n.lastLatencyMeasurement)
   465  }
   466  
   467  func (n *clusterNode) SetGeneration(gen uint32) {
   468  	for {
   469  		v := atomic.LoadUint32(&n.generation)
   470  		if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
   471  			break
   472  		}
   473  	}
   474  }
   475  
   476  func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
   477  	for {
   478  		v := atomic.LoadInt64(&n.lastLatencyMeasurement)
   479  		if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
   480  			break
   481  		}
   482  	}
   483  }
   484  
   485  func (n *clusterNode) Loading() bool {
   486  	loaded := atomic.LoadUint32(&n.loaded)
   487  	if loaded == 1 {
   488  		return false
   489  	}
   490  
   491  	// check if the node is loading
   492  	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
   493  	defer cancel()
   494  
   495  	err := n.Client.Ping(ctx).Err()
   496  	loading := err != nil && isLoadingError(err)
   497  	if !loading {
   498  		atomic.StoreUint32(&n.loaded, 1)
   499  	}
   500  	return loading
   501  }
   502  
   503  //------------------------------------------------------------------------------
   504  
   505  type clusterNodes struct {
   506  	opt *ClusterOptions
   507  
   508  	mu          sync.RWMutex
   509  	addrs       []string
   510  	nodes       map[string]*clusterNode
   511  	activeAddrs []string
   512  	closed      bool
   513  	onNewNode   []func(rdb *Client)
   514  
   515  	generation uint32 // atomic
   516  }
   517  
   518  func newClusterNodes(opt *ClusterOptions) *clusterNodes {
   519  	return &clusterNodes{
   520  		opt:   opt,
   521  		addrs: opt.Addrs,
   522  		nodes: make(map[string]*clusterNode),
   523  	}
   524  }
   525  
   526  func (c *clusterNodes) Close() error {
   527  	c.mu.Lock()
   528  	defer c.mu.Unlock()
   529  
   530  	if c.closed {
   531  		return nil
   532  	}
   533  	c.closed = true
   534  
   535  	var firstErr error
   536  	for _, node := range c.nodes {
   537  		if err := node.Client.Close(); err != nil && firstErr == nil {
   538  			firstErr = err
   539  		}
   540  	}
   541  
   542  	c.nodes = nil
   543  	c.activeAddrs = nil
   544  
   545  	return firstErr
   546  }
   547  
   548  func (c *clusterNodes) OnNewNode(fn func(rdb *Client)) {
   549  	c.mu.Lock()
   550  	c.onNewNode = append(c.onNewNode, fn)
   551  	c.mu.Unlock()
   552  }
   553  
   554  func (c *clusterNodes) Addrs() ([]string, error) {
   555  	var addrs []string
   556  
   557  	c.mu.RLock()
   558  	closed := c.closed //nolint:ifshort
   559  	if !closed {
   560  		if len(c.activeAddrs) > 0 {
   561  			addrs = make([]string, len(c.activeAddrs))
   562  			copy(addrs, c.activeAddrs)
   563  		} else {
   564  			addrs = make([]string, len(c.addrs))
   565  			copy(addrs, c.addrs)
   566  		}
   567  	}
   568  	c.mu.RUnlock()
   569  
   570  	if closed {
   571  		return nil, pool.ErrClosed
   572  	}
   573  	if len(addrs) == 0 {
   574  		return nil, errClusterNoNodes
   575  	}
   576  	return addrs, nil
   577  }
   578  
   579  func (c *clusterNodes) NextGeneration() uint32 {
   580  	return atomic.AddUint32(&c.generation, 1)
   581  }
   582  
   583  // GC removes unused nodes.
   584  func (c *clusterNodes) GC(generation uint32) {
   585  	var collected []*clusterNode
   586  
   587  	c.mu.Lock()
   588  
   589  	c.activeAddrs = c.activeAddrs[:0]
   590  	now := time.Now()
   591  	for addr, node := range c.nodes {
   592  		if node.Generation() >= generation {
   593  			c.activeAddrs = append(c.activeAddrs, addr)
   594  			if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
   595  				go node.updateLatency()
   596  			}
   597  			continue
   598  		}
   599  
   600  		delete(c.nodes, addr)
   601  		collected = append(collected, node)
   602  	}
   603  
   604  	c.mu.Unlock()
   605  
   606  	for _, node := range collected {
   607  		_ = node.Client.Close()
   608  	}
   609  }
   610  
   611  func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
   612  	node, err := c.get(addr)
   613  	if err != nil {
   614  		return nil, err
   615  	}
   616  	if node != nil {
   617  		return node, nil
   618  	}
   619  
   620  	c.mu.Lock()
   621  	defer c.mu.Unlock()
   622  
   623  	if c.closed {
   624  		return nil, pool.ErrClosed
   625  	}
   626  
   627  	node, ok := c.nodes[addr]
   628  	if ok {
   629  		return node, nil
   630  	}
   631  
   632  	node = newClusterNode(c.opt, addr)
   633  	for _, fn := range c.onNewNode {
   634  		fn(node.Client)
   635  	}
   636  
   637  	c.addrs = appendIfNotExist(c.addrs, addr)
   638  	c.nodes[addr] = node
   639  
   640  	return node, nil
   641  }
   642  
   643  func (c *clusterNodes) get(addr string) (*clusterNode, error) {
   644  	c.mu.RLock()
   645  	defer c.mu.RUnlock()
   646  
   647  	if c.closed {
   648  		return nil, pool.ErrClosed
   649  	}
   650  	return c.nodes[addr], nil
   651  }
   652  
   653  func (c *clusterNodes) All() ([]*clusterNode, error) {
   654  	c.mu.RLock()
   655  	defer c.mu.RUnlock()
   656  
   657  	if c.closed {
   658  		return nil, pool.ErrClosed
   659  	}
   660  
   661  	cp := make([]*clusterNode, 0, len(c.nodes))
   662  	for _, node := range c.nodes {
   663  		cp = append(cp, node)
   664  	}
   665  	return cp, nil
   666  }
   667  
   668  func (c *clusterNodes) Random() (*clusterNode, error) {
   669  	addrs, err := c.Addrs()
   670  	if err != nil {
   671  		return nil, err
   672  	}
   673  
   674  	n := rand.Intn(len(addrs))
   675  	return c.GetOrCreate(addrs[n])
   676  }
   677  
   678  //------------------------------------------------------------------------------
   679  
   680  type clusterSlot struct {
   681  	start int
   682  	end   int
   683  	nodes []*clusterNode
   684  }
   685  
   686  type clusterSlotSlice []*clusterSlot
   687  
   688  func (p clusterSlotSlice) Len() int {
   689  	return len(p)
   690  }
   691  
   692  func (p clusterSlotSlice) Less(i, j int) bool {
   693  	return p[i].start < p[j].start
   694  }
   695  
   696  func (p clusterSlotSlice) Swap(i, j int) {
   697  	p[i], p[j] = p[j], p[i]
   698  }
   699  
   700  type clusterState struct {
   701  	nodes   *clusterNodes
   702  	Masters []*clusterNode
   703  	Slaves  []*clusterNode
   704  
   705  	slots []*clusterSlot
   706  
   707  	generation uint32
   708  	createdAt  time.Time
   709  }
   710  
   711  func newClusterState(
   712  	nodes *clusterNodes, slots []ClusterSlot, origin string,
   713  ) (*clusterState, error) {
   714  	c := clusterState{
   715  		nodes: nodes,
   716  
   717  		slots: make([]*clusterSlot, 0, len(slots)),
   718  
   719  		generation: nodes.NextGeneration(),
   720  		createdAt:  time.Now(),
   721  	}
   722  
   723  	originHost, _, _ := net.SplitHostPort(origin)
   724  	isLoopbackOrigin := isLoopback(originHost)
   725  
   726  	for _, slot := range slots {
   727  		var nodes []*clusterNode
   728  		for i, slotNode := range slot.Nodes {
   729  			addr := slotNode.Addr
   730  			if !isLoopbackOrigin {
   731  				addr = replaceLoopbackHost(addr, originHost)
   732  			}
   733  
   734  			node, err := c.nodes.GetOrCreate(addr)
   735  			if err != nil {
   736  				return nil, err
   737  			}
   738  
   739  			node.SetGeneration(c.generation)
   740  			nodes = append(nodes, node)
   741  
   742  			if i == 0 {
   743  				c.Masters = appendIfNotExist(c.Masters, node)
   744  			} else {
   745  				c.Slaves = appendIfNotExist(c.Slaves, node)
   746  			}
   747  		}
   748  
   749  		c.slots = append(c.slots, &clusterSlot{
   750  			start: slot.Start,
   751  			end:   slot.End,
   752  			nodes: nodes,
   753  		})
   754  	}
   755  
   756  	sort.Sort(clusterSlotSlice(c.slots))
   757  
   758  	time.AfterFunc(time.Minute, func() {
   759  		nodes.GC(c.generation)
   760  	})
   761  
   762  	return &c, nil
   763  }
   764  
   765  func replaceLoopbackHost(nodeAddr, originHost string) string {
   766  	nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
   767  	if err != nil {
   768  		return nodeAddr
   769  	}
   770  
   771  	nodeIP := net.ParseIP(nodeHost)
   772  	if nodeIP == nil {
   773  		return nodeAddr
   774  	}
   775  
   776  	if !nodeIP.IsLoopback() {
   777  		return nodeAddr
   778  	}
   779  
   780  	// Use origin host which is not loopback and node port.
   781  	return net.JoinHostPort(originHost, nodePort)
   782  }
   783  
   784  // isLoopback returns true if the host is a loopback address.
   785  // For IP addresses, it uses net.IP.IsLoopback().
   786  // For hostnames, it recognizes well-known loopback hostnames like "localhost"
   787  // and Docker-specific loopback patterns like "*.docker.internal".
   788  func isLoopback(host string) bool {
   789  	ip := net.ParseIP(host)
   790  	if ip != nil {
   791  		return ip.IsLoopback()
   792  	}
   793  
   794  	if strings.ToLower(host) == "localhost" {
   795  		return true
   796  	}
   797  
   798  	if strings.HasSuffix(strings.ToLower(host), ".docker.internal") {
   799  		return true
   800  	}
   801  
   802  	return false
   803  }
   804  
   805  func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
   806  	nodes := c.slotNodes(slot)
   807  	if len(nodes) > 0 {
   808  		return nodes[0], nil
   809  	}
   810  	return c.nodes.Random()
   811  }
   812  
   813  func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
   814  	nodes := c.slotNodes(slot)
   815  	switch len(nodes) {
   816  	case 0:
   817  		return c.nodes.Random()
   818  	case 1:
   819  		return nodes[0], nil
   820  	case 2:
   821  		slave := nodes[1]
   822  		if !slave.Failing() && !slave.Loading() {
   823  			return slave, nil
   824  		}
   825  		return nodes[0], nil
   826  	default:
   827  		var slave *clusterNode
   828  		for i := 0; i < 10; i++ {
   829  			n := rand.Intn(len(nodes)-1) + 1
   830  			slave = nodes[n]
   831  			if !slave.Failing() && !slave.Loading() {
   832  				return slave, nil
   833  			}
   834  		}
   835  
   836  		// All slaves are loading - use master.
   837  		return nodes[0], nil
   838  	}
   839  }
   840  
   841  func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
   842  	nodes := c.slotNodes(slot)
   843  	if len(nodes) == 0 {
   844  		return c.nodes.Random()
   845  	}
   846  
   847  	var allNodesFailing = true
   848  	var (
   849  		closestNonFailingNode *clusterNode
   850  		closestNode           *clusterNode
   851  		minLatency            time.Duration
   852  	)
   853  
   854  	// setting the max possible duration as zerovalue for minlatency
   855  	minLatency = time.Duration(math.MaxInt64)
   856  
   857  	for _, n := range nodes {
   858  		if closestNode == nil || n.Latency() < minLatency {
   859  			closestNode = n
   860  			minLatency = n.Latency()
   861  			if !n.Failing() {
   862  				closestNonFailingNode = n
   863  				allNodesFailing = false
   864  			}
   865  		}
   866  	}
   867  
   868  	// pick the healthly node with the lowest latency
   869  	if !allNodesFailing && closestNonFailingNode != nil {
   870  		return closestNonFailingNode, nil
   871  	}
   872  
   873  	// if all nodes are failing, we will pick the temporarily failing node with lowest latency
   874  	if minLatency < maximumNodeLatency && closestNode != nil {
   875  		internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency")
   876  		return closestNode, nil
   877  	}
   878  
   879  	// If all nodes are having the maximum latency(all pings are failing) - return a random node across the cluster
   880  	internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster")
   881  	return c.nodes.Random()
   882  }
   883  
   884  func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
   885  	nodes := c.slotNodes(slot)
   886  	if len(nodes) == 0 {
   887  		return c.nodes.Random()
   888  	}
   889  	if len(nodes) == 1 {
   890  		return nodes[0], nil
   891  	}
   892  	randomNodes := rand.Perm(len(nodes))
   893  	for _, idx := range randomNodes {
   894  		if node := nodes[idx]; !node.Failing() {
   895  			return node, nil
   896  		}
   897  	}
   898  	return nodes[randomNodes[0]], nil
   899  }
   900  
   901  func (c *clusterState) slotNodes(slot int) []*clusterNode {
   902  	i := sort.Search(len(c.slots), func(i int) bool {
   903  		return c.slots[i].end >= slot
   904  	})
   905  	if i >= len(c.slots) {
   906  		return nil
   907  	}
   908  	x := c.slots[i]
   909  	if slot >= x.start && slot <= x.end {
   910  		return x.nodes
   911  	}
   912  	return nil
   913  }
   914  
   915  //------------------------------------------------------------------------------
   916  
   917  type clusterStateHolder struct {
   918  	load func(ctx context.Context) (*clusterState, error)
   919  
   920  	state     atomic.Value
   921  	reloading uint32 // atomic
   922  }
   923  
   924  func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
   925  	return &clusterStateHolder{
   926  		load: fn,
   927  	}
   928  }
   929  
   930  func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
   931  	state, err := c.load(ctx)
   932  	if err != nil {
   933  		return nil, err
   934  	}
   935  	c.state.Store(state)
   936  	return state, nil
   937  }
   938  
   939  func (c *clusterStateHolder) LazyReload() {
   940  	if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
   941  		return
   942  	}
   943  	go func() {
   944  		defer atomic.StoreUint32(&c.reloading, 0)
   945  
   946  		_, err := c.Reload(context.Background())
   947  		if err != nil {
   948  			return
   949  		}
   950  		time.Sleep(200 * time.Millisecond)
   951  	}()
   952  }
   953  
   954  func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
   955  	v := c.state.Load()
   956  	if v == nil {
   957  		return c.Reload(ctx)
   958  	}
   959  
   960  	state := v.(*clusterState)
   961  	if time.Since(state.createdAt) > 10*time.Second {
   962  		c.LazyReload()
   963  	}
   964  	return state, nil
   965  }
   966  
   967  func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
   968  	state, err := c.Reload(ctx)
   969  	if err == nil {
   970  		return state, nil
   971  	}
   972  	return c.Get(ctx)
   973  }
   974  
   975  //------------------------------------------------------------------------------
   976  
   977  // ClusterClient is a Redis Cluster client representing a pool of zero
   978  // or more underlying connections. It's safe for concurrent use by
   979  // multiple goroutines.
   980  type ClusterClient struct {
   981  	opt           *ClusterOptions
   982  	nodes         *clusterNodes
   983  	state         *clusterStateHolder
   984  	cmdsInfoCache *cmdsInfoCache
   985  	cmdable
   986  	hooksMixin
   987  }
   988  
   989  // NewClusterClient returns a Redis Cluster client as described in
   990  // http://redis.io/topics/cluster-spec.
   991  func NewClusterClient(opt *ClusterOptions) *ClusterClient {
   992  	if opt == nil {
   993  		panic("redis: NewClusterClient nil options")
   994  	}
   995  	opt.init()
   996  
   997  	c := &ClusterClient{
   998  		opt:   opt,
   999  		nodes: newClusterNodes(opt),
  1000  	}
  1001  
  1002  	c.state = newClusterStateHolder(c.loadState)
  1003  	c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  1004  	c.cmdable = c.Process
  1005  
  1006  	c.initHooks(hooks{
  1007  		dial:       nil,
  1008  		process:    c.process,
  1009  		pipeline:   c.processPipeline,
  1010  		txPipeline: c.processTxPipeline,
  1011  	})
  1012  
  1013  	return c
  1014  }
  1015  
  1016  // Options returns read-only Options that were used to create the client.
  1017  func (c *ClusterClient) Options() *ClusterOptions {
  1018  	return c.opt
  1019  }
  1020  
  1021  // ReloadState reloads cluster state. If available it calls ClusterSlots func
  1022  // to get cluster slots information.
  1023  func (c *ClusterClient) ReloadState(ctx context.Context) {
  1024  	c.state.LazyReload()
  1025  }
  1026  
  1027  // Close closes the cluster client, releasing any open resources.
  1028  //
  1029  // It is rare to Close a ClusterClient, as the ClusterClient is meant
  1030  // to be long-lived and shared between many goroutines.
  1031  func (c *ClusterClient) Close() error {
  1032  	return c.nodes.Close()
  1033  }
  1034  
  1035  func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
  1036  	err := c.processHook(ctx, cmd)
  1037  	cmd.SetErr(err)
  1038  	return err
  1039  }
  1040  
  1041  func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
  1042  	slot := c.cmdSlot(cmd, -1)
  1043  	var node *clusterNode
  1044  	var moved bool
  1045  	var ask bool
  1046  	var lastErr error
  1047  	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1048  		// MOVED and ASK responses are not transient errors that require retry delay; they
  1049  		// should be attempted immediately.
  1050  		if attempt > 0 && !moved && !ask {
  1051  			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1052  				return err
  1053  			}
  1054  		}
  1055  
  1056  		if node == nil {
  1057  			var err error
  1058  			node, err = c.cmdNode(ctx, cmd.Name(), slot)
  1059  			if err != nil {
  1060  				return err
  1061  			}
  1062  		}
  1063  
  1064  		if ask {
  1065  			ask = false
  1066  
  1067  			pipe := node.Client.Pipeline()
  1068  			_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
  1069  			_ = pipe.Process(ctx, cmd)
  1070  			_, lastErr = pipe.Exec(ctx)
  1071  		} else {
  1072  			lastErr = node.Client.Process(ctx, cmd)
  1073  		}
  1074  
  1075  		// If there is no error - we are done.
  1076  		if lastErr == nil {
  1077  			return nil
  1078  		}
  1079  		if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
  1080  			if isReadOnly {
  1081  				c.state.LazyReload()
  1082  			}
  1083  			node = nil
  1084  			continue
  1085  		}
  1086  
  1087  		// If slave is loading - pick another node.
  1088  		if c.opt.ReadOnly && isLoadingError(lastErr) {
  1089  			node.MarkAsFailing()
  1090  			node = nil
  1091  			continue
  1092  		}
  1093  
  1094  		var addr string
  1095  		moved, ask, addr = isMovedError(lastErr)
  1096  		if moved || ask {
  1097  			c.state.LazyReload()
  1098  
  1099  			var err error
  1100  			node, err = c.nodes.GetOrCreate(addr)
  1101  			if err != nil {
  1102  				return err
  1103  			}
  1104  			continue
  1105  		}
  1106  
  1107  		if shouldRetry(lastErr, cmd.readTimeout() == nil) {
  1108  			// First retry the same node.
  1109  			if attempt == 0 {
  1110  				continue
  1111  			}
  1112  
  1113  			// Second try another node.
  1114  			node.MarkAsFailing()
  1115  			node = nil
  1116  			continue
  1117  		}
  1118  
  1119  		return lastErr
  1120  	}
  1121  	return lastErr
  1122  }
  1123  
  1124  func (c *ClusterClient) OnNewNode(fn func(rdb *Client)) {
  1125  	c.nodes.OnNewNode(fn)
  1126  }
  1127  
  1128  // ForEachMaster concurrently calls the fn on each master node in the cluster.
  1129  // It returns the first error if any.
  1130  func (c *ClusterClient) ForEachMaster(
  1131  	ctx context.Context,
  1132  	fn func(ctx context.Context, client *Client) error,
  1133  ) error {
  1134  	state, err := c.state.ReloadOrGet(ctx)
  1135  	if err != nil {
  1136  		return err
  1137  	}
  1138  
  1139  	var wg sync.WaitGroup
  1140  	errCh := make(chan error, 1)
  1141  
  1142  	for _, master := range state.Masters {
  1143  		wg.Add(1)
  1144  		go func(node *clusterNode) {
  1145  			defer wg.Done()
  1146  			err := fn(ctx, node.Client)
  1147  			if err != nil {
  1148  				select {
  1149  				case errCh <- err:
  1150  				default:
  1151  				}
  1152  			}
  1153  		}(master)
  1154  	}
  1155  
  1156  	wg.Wait()
  1157  
  1158  	select {
  1159  	case err := <-errCh:
  1160  		return err
  1161  	default:
  1162  		return nil
  1163  	}
  1164  }
  1165  
  1166  // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  1167  // It returns the first error if any.
  1168  func (c *ClusterClient) ForEachSlave(
  1169  	ctx context.Context,
  1170  	fn func(ctx context.Context, client *Client) error,
  1171  ) error {
  1172  	state, err := c.state.ReloadOrGet(ctx)
  1173  	if err != nil {
  1174  		return err
  1175  	}
  1176  
  1177  	var wg sync.WaitGroup
  1178  	errCh := make(chan error, 1)
  1179  
  1180  	for _, slave := range state.Slaves {
  1181  		wg.Add(1)
  1182  		go func(node *clusterNode) {
  1183  			defer wg.Done()
  1184  			err := fn(ctx, node.Client)
  1185  			if err != nil {
  1186  				select {
  1187  				case errCh <- err:
  1188  				default:
  1189  				}
  1190  			}
  1191  		}(slave)
  1192  	}
  1193  
  1194  	wg.Wait()
  1195  
  1196  	select {
  1197  	case err := <-errCh:
  1198  		return err
  1199  	default:
  1200  		return nil
  1201  	}
  1202  }
  1203  
  1204  // ForEachShard concurrently calls the fn on each known node in the cluster.
  1205  // It returns the first error if any.
  1206  func (c *ClusterClient) ForEachShard(
  1207  	ctx context.Context,
  1208  	fn func(ctx context.Context, client *Client) error,
  1209  ) error {
  1210  	state, err := c.state.ReloadOrGet(ctx)
  1211  	if err != nil {
  1212  		return err
  1213  	}
  1214  
  1215  	var wg sync.WaitGroup
  1216  	errCh := make(chan error, 1)
  1217  
  1218  	worker := func(node *clusterNode) {
  1219  		defer wg.Done()
  1220  		err := fn(ctx, node.Client)
  1221  		if err != nil {
  1222  			select {
  1223  			case errCh <- err:
  1224  			default:
  1225  			}
  1226  		}
  1227  	}
  1228  
  1229  	for _, node := range state.Masters {
  1230  		wg.Add(1)
  1231  		go worker(node)
  1232  	}
  1233  	for _, node := range state.Slaves {
  1234  		wg.Add(1)
  1235  		go worker(node)
  1236  	}
  1237  
  1238  	wg.Wait()
  1239  
  1240  	select {
  1241  	case err := <-errCh:
  1242  		return err
  1243  	default:
  1244  		return nil
  1245  	}
  1246  }
  1247  
  1248  // PoolStats returns accumulated connection pool stats.
  1249  func (c *ClusterClient) PoolStats() *PoolStats {
  1250  	var acc PoolStats
  1251  
  1252  	state, _ := c.state.Get(context.TODO())
  1253  	if state == nil {
  1254  		return &acc
  1255  	}
  1256  
  1257  	for _, node := range state.Masters {
  1258  		s := node.Client.connPool.Stats()
  1259  		acc.Hits += s.Hits
  1260  		acc.Misses += s.Misses
  1261  		acc.Timeouts += s.Timeouts
  1262  
  1263  		acc.TotalConns += s.TotalConns
  1264  		acc.IdleConns += s.IdleConns
  1265  		acc.StaleConns += s.StaleConns
  1266  	}
  1267  
  1268  	for _, node := range state.Slaves {
  1269  		s := node.Client.connPool.Stats()
  1270  		acc.Hits += s.Hits
  1271  		acc.Misses += s.Misses
  1272  		acc.Timeouts += s.Timeouts
  1273  
  1274  		acc.TotalConns += s.TotalConns
  1275  		acc.IdleConns += s.IdleConns
  1276  		acc.StaleConns += s.StaleConns
  1277  	}
  1278  
  1279  	return &acc
  1280  }
  1281  
  1282  func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
  1283  	if c.opt.ClusterSlots != nil {
  1284  		slots, err := c.opt.ClusterSlots(ctx)
  1285  		if err != nil {
  1286  			return nil, err
  1287  		}
  1288  		return newClusterState(c.nodes, slots, "")
  1289  	}
  1290  
  1291  	addrs, err := c.nodes.Addrs()
  1292  	if err != nil {
  1293  		return nil, err
  1294  	}
  1295  
  1296  	var firstErr error
  1297  
  1298  	for _, idx := range rand.Perm(len(addrs)) {
  1299  		addr := addrs[idx]
  1300  
  1301  		node, err := c.nodes.GetOrCreate(addr)
  1302  		if err != nil {
  1303  			if firstErr == nil {
  1304  				firstErr = err
  1305  			}
  1306  			continue
  1307  		}
  1308  
  1309  		slots, err := node.Client.ClusterSlots(ctx).Result()
  1310  		if err != nil {
  1311  			if firstErr == nil {
  1312  				firstErr = err
  1313  			}
  1314  			continue
  1315  		}
  1316  
  1317  		return newClusterState(c.nodes, slots, addr)
  1318  	}
  1319  
  1320  	/*
  1321  	 * No node is connectable. It's possible that all nodes' IP has changed.
  1322  	 * Clear activeAddrs to let client be able to re-connect using the initial
  1323  	 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
  1324  	 * which might have chance to resolve domain name and get updated IP address.
  1325  	 */
  1326  	c.nodes.mu.Lock()
  1327  	c.nodes.activeAddrs = nil
  1328  	c.nodes.mu.Unlock()
  1329  
  1330  	return nil, firstErr
  1331  }
  1332  
  1333  func (c *ClusterClient) Pipeline() Pipeliner {
  1334  	pipe := Pipeline{
  1335  		exec: pipelineExecer(c.processPipelineHook),
  1336  	}
  1337  	pipe.init()
  1338  	return &pipe
  1339  }
  1340  
  1341  func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  1342  	return c.Pipeline().Pipelined(ctx, fn)
  1343  }
  1344  
  1345  func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
  1346  	cmdsMap := newCmdsMap()
  1347  
  1348  	if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
  1349  		setCmdsErr(cmds, err)
  1350  		return err
  1351  	}
  1352  
  1353  	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1354  		if attempt > 0 {
  1355  			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1356  				setCmdsErr(cmds, err)
  1357  				return err
  1358  			}
  1359  		}
  1360  
  1361  		failedCmds := newCmdsMap()
  1362  		var wg sync.WaitGroup
  1363  
  1364  		for node, cmds := range cmdsMap.m {
  1365  			wg.Add(1)
  1366  			go func(node *clusterNode, cmds []Cmder) {
  1367  				defer wg.Done()
  1368  				c.processPipelineNode(ctx, node, cmds, failedCmds)
  1369  			}(node, cmds)
  1370  		}
  1371  
  1372  		wg.Wait()
  1373  		if len(failedCmds.m) == 0 {
  1374  			break
  1375  		}
  1376  		cmdsMap = failedCmds
  1377  	}
  1378  
  1379  	return cmdsFirstErr(cmds)
  1380  }
  1381  
  1382  func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
  1383  	state, err := c.state.Get(ctx)
  1384  	if err != nil {
  1385  		return err
  1386  	}
  1387  
  1388  	preferredRandomSlot := -1
  1389  	if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
  1390  		for _, cmd := range cmds {
  1391  			slot := c.cmdSlot(cmd, preferredRandomSlot)
  1392  			if preferredRandomSlot == -1 {
  1393  				preferredRandomSlot = slot
  1394  			}
  1395  			node, err := c.slotReadOnlyNode(state, slot)
  1396  			if err != nil {
  1397  				return err
  1398  			}
  1399  			cmdsMap.Add(node, cmd)
  1400  		}
  1401  		return nil
  1402  	}
  1403  
  1404  	for _, cmd := range cmds {
  1405  		slot := c.cmdSlot(cmd, preferredRandomSlot)
  1406  		if preferredRandomSlot == -1 {
  1407  			preferredRandomSlot = slot
  1408  		}
  1409  		node, err := state.slotMasterNode(slot)
  1410  		if err != nil {
  1411  			return err
  1412  		}
  1413  		cmdsMap.Add(node, cmd)
  1414  	}
  1415  	return nil
  1416  }
  1417  
  1418  func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
  1419  	for _, cmd := range cmds {
  1420  		cmdInfo := c.cmdInfo(ctx, cmd.Name())
  1421  		if cmdInfo == nil || !cmdInfo.ReadOnly {
  1422  			return false
  1423  		}
  1424  	}
  1425  	return true
  1426  }
  1427  
  1428  func (c *ClusterClient) processPipelineNode(
  1429  	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
  1430  ) {
  1431  	_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  1432  		cn, err := node.Client.getConn(ctx)
  1433  		if err != nil {
  1434  			if !isContextError(err) {
  1435  				node.MarkAsFailing()
  1436  			}
  1437  			_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1438  			setCmdsErr(cmds, err)
  1439  			return err
  1440  		}
  1441  
  1442  		var processErr error
  1443  		defer func() {
  1444  			node.Client.releaseConn(ctx, cn, processErr)
  1445  		}()
  1446  		processErr = c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
  1447  
  1448  		return processErr
  1449  	})
  1450  }
  1451  
  1452  func (c *ClusterClient) processPipelineNodeConn(
  1453  	ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
  1454  ) error {
  1455  	if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1456  		return writeCmds(wr, cmds)
  1457  	}); err != nil {
  1458  		if isBadConn(err, false, node.Client.getAddr()) {
  1459  			node.MarkAsFailing()
  1460  		}
  1461  		if shouldRetry(err, true) {
  1462  			_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1463  		}
  1464  		setCmdsErr(cmds, err)
  1465  		return err
  1466  	}
  1467  
  1468  	return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1469  		return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
  1470  	})
  1471  }
  1472  
  1473  func (c *ClusterClient) pipelineReadCmds(
  1474  	ctx context.Context,
  1475  	node *clusterNode,
  1476  	rd *proto.Reader,
  1477  	cmds []Cmder,
  1478  	failedCmds *cmdsMap,
  1479  ) error {
  1480  	for i, cmd := range cmds {
  1481  		err := cmd.readReply(rd)
  1482  		cmd.SetErr(err)
  1483  
  1484  		if err == nil {
  1485  			continue
  1486  		}
  1487  
  1488  		if c.checkMovedErr(ctx, cmd, err, failedCmds) {
  1489  			continue
  1490  		}
  1491  
  1492  		if c.opt.ReadOnly && isBadConn(err, false, node.Client.getAddr()) {
  1493  			node.MarkAsFailing()
  1494  		}
  1495  
  1496  		if !isRedisError(err) {
  1497  			if shouldRetry(err, true) {
  1498  				_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1499  			}
  1500  			setCmdsErr(cmds[i+1:], err)
  1501  			return err
  1502  		}
  1503  	}
  1504  
  1505  	if err := cmds[0].Err(); err != nil && shouldRetry(err, true) {
  1506  		_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1507  		return err
  1508  	}
  1509  
  1510  	return nil
  1511  }
  1512  
  1513  func (c *ClusterClient) checkMovedErr(
  1514  	ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
  1515  ) bool {
  1516  	moved, ask, addr := isMovedError(err)
  1517  	if !moved && !ask {
  1518  		return false
  1519  	}
  1520  
  1521  	node, err := c.nodes.GetOrCreate(addr)
  1522  	if err != nil {
  1523  		return false
  1524  	}
  1525  
  1526  	if moved {
  1527  		c.state.LazyReload()
  1528  		failedCmds.Add(node, cmd)
  1529  		return true
  1530  	}
  1531  
  1532  	if ask {
  1533  		failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
  1534  		return true
  1535  	}
  1536  
  1537  	panic("not reached")
  1538  }
  1539  
  1540  // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1541  func (c *ClusterClient) TxPipeline() Pipeliner {
  1542  	pipe := Pipeline{
  1543  		exec: func(ctx context.Context, cmds []Cmder) error {
  1544  			cmds = wrapMultiExec(ctx, cmds)
  1545  			return c.processTxPipelineHook(ctx, cmds)
  1546  		},
  1547  	}
  1548  	pipe.init()
  1549  	return &pipe
  1550  }
  1551  
  1552  func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  1553  	return c.TxPipeline().Pipelined(ctx, fn)
  1554  }
  1555  
  1556  func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  1557  	// Trim multi .. exec.
  1558  	cmds = cmds[1 : len(cmds)-1]
  1559  
  1560  	if len(cmds) == 0 {
  1561  		return nil
  1562  	}
  1563  
  1564  	state, err := c.state.Get(ctx)
  1565  	if err != nil {
  1566  		setCmdsErr(cmds, err)
  1567  		return err
  1568  	}
  1569  
  1570  	keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
  1571  	slot := -1
  1572  	switch len(keyedCmdsBySlot) {
  1573  	case 0:
  1574  		slot = hashtag.RandomSlot()
  1575  	case 1:
  1576  		for sl := range keyedCmdsBySlot {
  1577  			slot = sl
  1578  			break
  1579  		}
  1580  	default:
  1581  		// TxPipeline does not support cross slot transaction.
  1582  		setCmdsErr(cmds, ErrCrossSlot)
  1583  		return ErrCrossSlot
  1584  	}
  1585  
  1586  	node, err := state.slotMasterNode(slot)
  1587  	if err != nil {
  1588  		setCmdsErr(cmds, err)
  1589  		return err
  1590  	}
  1591  
  1592  	cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1593  	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1594  		if attempt > 0 {
  1595  			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1596  				setCmdsErr(cmds, err)
  1597  				return err
  1598  			}
  1599  		}
  1600  
  1601  		failedCmds := newCmdsMap()
  1602  		var wg sync.WaitGroup
  1603  
  1604  		for node, cmds := range cmdsMap {
  1605  			wg.Add(1)
  1606  			go func(node *clusterNode, cmds []Cmder) {
  1607  				defer wg.Done()
  1608  				c.processTxPipelineNode(ctx, node, cmds, failedCmds)
  1609  			}(node, cmds)
  1610  		}
  1611  
  1612  		wg.Wait()
  1613  		if len(failedCmds.m) == 0 {
  1614  			break
  1615  		}
  1616  		cmdsMap = failedCmds.m
  1617  	}
  1618  
  1619  	return cmdsFirstErr(cmds)
  1620  }
  1621  
  1622  // slottedKeyedCommands returns a map of slot to commands taking into account
  1623  // only commands that have keys.
  1624  func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
  1625  	cmdsSlots := map[int][]Cmder{}
  1626  
  1627  	preferredRandomSlot := -1
  1628  	for _, cmd := range cmds {
  1629  		if cmdFirstKeyPos(cmd) == 0 {
  1630  			continue
  1631  		}
  1632  
  1633  		slot := c.cmdSlot(cmd, preferredRandomSlot)
  1634  		if preferredRandomSlot == -1 {
  1635  			preferredRandomSlot = slot
  1636  		}
  1637  
  1638  		cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
  1639  	}
  1640  
  1641  	return cmdsSlots
  1642  }
  1643  
  1644  func (c *ClusterClient) processTxPipelineNode(
  1645  	ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
  1646  ) {
  1647  	cmds = wrapMultiExec(ctx, cmds)
  1648  	_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  1649  		cn, err := node.Client.getConn(ctx)
  1650  		if err != nil {
  1651  			_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1652  			setCmdsErr(cmds, err)
  1653  			return err
  1654  		}
  1655  
  1656  		var processErr error
  1657  		defer func() {
  1658  			node.Client.releaseConn(ctx, cn, processErr)
  1659  		}()
  1660  		processErr = c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
  1661  
  1662  		return processErr
  1663  	})
  1664  }
  1665  
  1666  func (c *ClusterClient) processTxPipelineNodeConn(
  1667  	ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
  1668  ) error {
  1669  	if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1670  		return writeCmds(wr, cmds)
  1671  	}); err != nil {
  1672  		if shouldRetry(err, true) {
  1673  			_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
  1674  		}
  1675  		setCmdsErr(cmds, err)
  1676  		return err
  1677  	}
  1678  
  1679  	return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1680  		statusCmd := cmds[0].(*StatusCmd)
  1681  		// Trim multi and exec.
  1682  		trimmedCmds := cmds[1 : len(cmds)-1]
  1683  
  1684  		if err := c.txPipelineReadQueued(
  1685  			ctx, rd, statusCmd, trimmedCmds, failedCmds,
  1686  		); err != nil {
  1687  			setCmdsErr(cmds, err)
  1688  
  1689  			moved, ask, addr := isMovedError(err)
  1690  			if moved || ask {
  1691  				return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
  1692  			}
  1693  
  1694  			return err
  1695  		}
  1696  
  1697  		return pipelineReadCmds(rd, trimmedCmds)
  1698  	})
  1699  }
  1700  
  1701  func (c *ClusterClient) txPipelineReadQueued(
  1702  	ctx context.Context,
  1703  	rd *proto.Reader,
  1704  	statusCmd *StatusCmd,
  1705  	cmds []Cmder,
  1706  	failedCmds *cmdsMap,
  1707  ) error {
  1708  	// Parse queued replies.
  1709  	if err := statusCmd.readReply(rd); err != nil {
  1710  		return err
  1711  	}
  1712  
  1713  	for _, cmd := range cmds {
  1714  		err := statusCmd.readReply(rd)
  1715  		if err != nil {
  1716  			if c.checkMovedErr(ctx, cmd, err, failedCmds) {
  1717  				// will be processed later
  1718  				continue
  1719  			}
  1720  			cmd.SetErr(err)
  1721  			if !isRedisError(err) {
  1722  				return err
  1723  			}
  1724  		}
  1725  	}
  1726  
  1727  	// Parse number of replies.
  1728  	line, err := rd.ReadLine()
  1729  	if err != nil {
  1730  		if err == Nil {
  1731  			err = TxFailedErr
  1732  		}
  1733  		return err
  1734  	}
  1735  
  1736  	if line[0] != proto.RespArray {
  1737  		return fmt.Errorf("redis: expected '*', but got line %q", line)
  1738  	}
  1739  
  1740  	return nil
  1741  }
  1742  
  1743  func (c *ClusterClient) cmdsMoved(
  1744  	ctx context.Context, cmds []Cmder,
  1745  	moved, ask bool,
  1746  	addr string,
  1747  	failedCmds *cmdsMap,
  1748  ) error {
  1749  	node, err := c.nodes.GetOrCreate(addr)
  1750  	if err != nil {
  1751  		return err
  1752  	}
  1753  
  1754  	if moved {
  1755  		c.state.LazyReload()
  1756  		for _, cmd := range cmds {
  1757  			failedCmds.Add(node, cmd)
  1758  		}
  1759  		return nil
  1760  	}
  1761  
  1762  	if ask {
  1763  		for _, cmd := range cmds {
  1764  			failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
  1765  		}
  1766  		return nil
  1767  	}
  1768  
  1769  	return nil
  1770  }
  1771  
  1772  func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
  1773  	if len(keys) == 0 {
  1774  		return fmt.Errorf("redis: Watch requires at least one key")
  1775  	}
  1776  
  1777  	slot := hashtag.Slot(keys[0])
  1778  	for _, key := range keys[1:] {
  1779  		if hashtag.Slot(key) != slot {
  1780  			err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  1781  			return err
  1782  		}
  1783  	}
  1784  
  1785  	node, err := c.slotMasterNode(ctx, slot)
  1786  	if err != nil {
  1787  		return err
  1788  	}
  1789  
  1790  	for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1791  		if attempt > 0 {
  1792  			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1793  				return err
  1794  			}
  1795  		}
  1796  
  1797  		err = node.Client.Watch(ctx, fn, keys...)
  1798  		if err == nil {
  1799  			break
  1800  		}
  1801  
  1802  		moved, ask, addr := isMovedError(err)
  1803  		if moved || ask {
  1804  			node, err = c.nodes.GetOrCreate(addr)
  1805  			if err != nil {
  1806  				return err
  1807  			}
  1808  			continue
  1809  		}
  1810  
  1811  		if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
  1812  			if isReadOnly {
  1813  				c.state.LazyReload()
  1814  			}
  1815  			node, err = c.slotMasterNode(ctx, slot)
  1816  			if err != nil {
  1817  				return err
  1818  			}
  1819  			continue
  1820  		}
  1821  
  1822  		if shouldRetry(err, true) {
  1823  			continue
  1824  		}
  1825  
  1826  		return err
  1827  	}
  1828  
  1829  	return err
  1830  }
  1831  
  1832  func (c *ClusterClient) pubSub() *PubSub {
  1833  	var node *clusterNode
  1834  	pubsub := &PubSub{
  1835  		opt: c.opt.clientOptions(),
  1836  
  1837  		newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
  1838  			if node != nil {
  1839  				panic("node != nil")
  1840  			}
  1841  
  1842  			var err error
  1843  
  1844  			if len(channels) > 0 {
  1845  				slot := hashtag.Slot(channels[0])
  1846  
  1847  				// newConn in PubSub is only used for subscription connections, so it is safe to
  1848  				// assume that a slave node can always be used when client options specify ReadOnly.
  1849  				if c.opt.ReadOnly {
  1850  					state, err := c.state.Get(ctx)
  1851  					if err != nil {
  1852  						return nil, err
  1853  					}
  1854  
  1855  					node, err = c.slotReadOnlyNode(state, slot)
  1856  					if err != nil {
  1857  						return nil, err
  1858  					}
  1859  				} else {
  1860  					node, err = c.slotMasterNode(ctx, slot)
  1861  					if err != nil {
  1862  						return nil, err
  1863  					}
  1864  				}
  1865  			} else {
  1866  				node, err = c.nodes.Random()
  1867  				if err != nil {
  1868  					return nil, err
  1869  				}
  1870  			}
  1871  
  1872  			cn, err := node.Client.newConn(context.TODO())
  1873  			if err != nil {
  1874  				node = nil
  1875  
  1876  				return nil, err
  1877  			}
  1878  
  1879  			return cn, nil
  1880  		},
  1881  		closeConn: func(cn *pool.Conn) error {
  1882  			err := node.Client.connPool.CloseConn(cn)
  1883  			node = nil
  1884  			return err
  1885  		},
  1886  	}
  1887  	pubsub.init()
  1888  
  1889  	return pubsub
  1890  }
  1891  
  1892  // Subscribe subscribes the client to the specified channels.
  1893  // Channels can be omitted to create empty subscription.
  1894  func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
  1895  	pubsub := c.pubSub()
  1896  	if len(channels) > 0 {
  1897  		_ = pubsub.Subscribe(ctx, channels...)
  1898  	}
  1899  	return pubsub
  1900  }
  1901  
  1902  // PSubscribe subscribes the client to the given patterns.
  1903  // Patterns can be omitted to create empty subscription.
  1904  func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
  1905  	pubsub := c.pubSub()
  1906  	if len(channels) > 0 {
  1907  		_ = pubsub.PSubscribe(ctx, channels...)
  1908  	}
  1909  	return pubsub
  1910  }
  1911  
  1912  // SSubscribe Subscribes the client to the specified shard channels.
  1913  func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) *PubSub {
  1914  	pubsub := c.pubSub()
  1915  	if len(channels) > 0 {
  1916  		_ = pubsub.SSubscribe(ctx, channels...)
  1917  	}
  1918  	return pubsub
  1919  }
  1920  
  1921  func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  1922  	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  1923  }
  1924  
  1925  func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
  1926  	// Try 3 random nodes.
  1927  	const nodeLimit = 3
  1928  
  1929  	addrs, err := c.nodes.Addrs()
  1930  	if err != nil {
  1931  		return nil, err
  1932  	}
  1933  
  1934  	var firstErr error
  1935  
  1936  	perm := rand.Perm(len(addrs))
  1937  	if len(perm) > nodeLimit {
  1938  		perm = perm[:nodeLimit]
  1939  	}
  1940  
  1941  	for _, idx := range perm {
  1942  		addr := addrs[idx]
  1943  
  1944  		node, err := c.nodes.GetOrCreate(addr)
  1945  		if err != nil {
  1946  			if firstErr == nil {
  1947  				firstErr = err
  1948  			}
  1949  			continue
  1950  		}
  1951  
  1952  		info, err := node.Client.Command(ctx).Result()
  1953  		if err == nil {
  1954  			return info, nil
  1955  		}
  1956  		if firstErr == nil {
  1957  			firstErr = err
  1958  		}
  1959  	}
  1960  
  1961  	if firstErr == nil {
  1962  		panic("not reached")
  1963  	}
  1964  	return nil, firstErr
  1965  }
  1966  
  1967  func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
  1968  	cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
  1969  	if err != nil {
  1970  		internal.Logger.Printf(context.TODO(), "getting command info: %s", err)
  1971  		return nil
  1972  	}
  1973  
  1974  	info := cmdsInfo[name]
  1975  	if info == nil {
  1976  		internal.Logger.Printf(context.TODO(), "info for cmd=%s not found", name)
  1977  	}
  1978  	return info
  1979  }
  1980  
  1981  func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
  1982  	args := cmd.Args()
  1983  	if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
  1984  		return args[2].(int)
  1985  	}
  1986  
  1987  	return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
  1988  }
  1989  
  1990  func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
  1991  	if pos == 0 {
  1992  		if preferredRandomSlot != -1 {
  1993  			return preferredRandomSlot
  1994  		}
  1995  		return hashtag.RandomSlot()
  1996  	}
  1997  	firstKey := cmd.stringArg(pos)
  1998  	return hashtag.Slot(firstKey)
  1999  }
  2000  
  2001  func (c *ClusterClient) cmdNode(
  2002  	ctx context.Context,
  2003  	cmdName string,
  2004  	slot int,
  2005  ) (*clusterNode, error) {
  2006  	state, err := c.state.Get(ctx)
  2007  	if err != nil {
  2008  		return nil, err
  2009  	}
  2010  
  2011  	if c.opt.ReadOnly {
  2012  		cmdInfo := c.cmdInfo(ctx, cmdName)
  2013  		if cmdInfo != nil && cmdInfo.ReadOnly {
  2014  			return c.slotReadOnlyNode(state, slot)
  2015  		}
  2016  	}
  2017  	return state.slotMasterNode(slot)
  2018  }
  2019  
  2020  func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
  2021  	if c.opt.RouteByLatency {
  2022  		return state.slotClosestNode(slot)
  2023  	}
  2024  	if c.opt.RouteRandomly {
  2025  		return state.slotRandomNode(slot)
  2026  	}
  2027  	return state.slotSlaveNode(slot)
  2028  }
  2029  
  2030  func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
  2031  	state, err := c.state.Get(ctx)
  2032  	if err != nil {
  2033  		return nil, err
  2034  	}
  2035  	return state.slotMasterNode(slot)
  2036  }
  2037  
  2038  // SlaveForKey gets a client for a replica node to run any command on it.
  2039  // This is especially useful if we want to run a particular lua script which has
  2040  // only read only commands on the replica.
  2041  // This is because other redis commands generally have a flag that points that
  2042  // they are read only and automatically run on the replica nodes
  2043  // if ClusterOptions.ReadOnly flag is set to true.
  2044  func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
  2045  	state, err := c.state.Get(ctx)
  2046  	if err != nil {
  2047  		return nil, err
  2048  	}
  2049  	slot := hashtag.Slot(key)
  2050  	node, err := c.slotReadOnlyNode(state, slot)
  2051  	if err != nil {
  2052  		return nil, err
  2053  	}
  2054  	return node.Client, err
  2055  }
  2056  
  2057  // MasterForKey return a client to the master node for a particular key.
  2058  func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
  2059  	slot := hashtag.Slot(key)
  2060  	node, err := c.slotMasterNode(ctx, slot)
  2061  	if err != nil {
  2062  		return nil, err
  2063  	}
  2064  	return node.Client, nil
  2065  }
  2066  
  2067  func (c *ClusterClient) context(ctx context.Context) context.Context {
  2068  	if c.opt.ContextTimeoutEnabled {
  2069  		return ctx
  2070  	}
  2071  	return context.Background()
  2072  }
  2073  
  2074  func appendIfNotExist[T comparable](vals []T, newVal T) []T {
  2075  	for _, v := range vals {
  2076  		if v == newVal {
  2077  			return vals
  2078  		}
  2079  	}
  2080  	return append(vals, newVal)
  2081  }
  2082  
  2083  //------------------------------------------------------------------------------
  2084  
  2085  type cmdsMap struct {
  2086  	mu sync.Mutex
  2087  	m  map[*clusterNode][]Cmder
  2088  }
  2089  
  2090  func newCmdsMap() *cmdsMap {
  2091  	return &cmdsMap{
  2092  		m: make(map[*clusterNode][]Cmder),
  2093  	}
  2094  }
  2095  
  2096  func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
  2097  	m.mu.Lock()
  2098  	m.m[node] = append(m.m[node], cmds...)
  2099  	m.mu.Unlock()
  2100  }
  2101  

View as plain text