...

Source file src/github.com/redis/go-redis/v9/sentinel.go

Documentation: github.com/redis/go-redis/v9

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"errors"
     7  	"fmt"
     8  	"net"
     9  	"net/url"
    10  	"strconv"
    11  	"strings"
    12  	"sync"
    13  	"time"
    14  
    15  	"github.com/redis/go-redis/v9/auth"
    16  	"github.com/redis/go-redis/v9/internal"
    17  	"github.com/redis/go-redis/v9/internal/pool"
    18  	"github.com/redis/go-redis/v9/internal/rand"
    19  	"github.com/redis/go-redis/v9/internal/util"
    20  )
    21  
    22  //------------------------------------------------------------------------------
    23  
    24  // FailoverOptions are used to configure a failover client and should
    25  // be passed to NewFailoverClient.
    26  type FailoverOptions struct {
    27  	// The master name.
    28  	MasterName string
    29  	// A seed list of host:port addresses of sentinel nodes.
    30  	SentinelAddrs []string
    31  
    32  	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
    33  	ClientName string
    34  
    35  	// If specified with SentinelPassword, enables ACL-based authentication (via
    36  	// AUTH <user> <pass>).
    37  	SentinelUsername string
    38  	// Sentinel password from "requirepass <password>" (if enabled) in Sentinel
    39  	// configuration, or, if SentinelUsername is also supplied, used for ACL-based
    40  	// authentication.
    41  	SentinelPassword string
    42  
    43  	// Allows routing read-only commands to the closest master or replica node.
    44  	// This option only works with NewFailoverClusterClient.
    45  	RouteByLatency bool
    46  	// Allows routing read-only commands to the random master or replica node.
    47  	// This option only works with NewFailoverClusterClient.
    48  	RouteRandomly bool
    49  
    50  	// Route all commands to replica read-only nodes.
    51  	ReplicaOnly bool
    52  
    53  	// Use replicas disconnected with master when cannot get connected replicas
    54  	// Now, this option only works in RandomReplicaAddr function.
    55  	UseDisconnectedReplicas bool
    56  
    57  	// Following options are copied from Options struct.
    58  
    59  	Dialer    func(ctx context.Context, network, addr string) (net.Conn, error)
    60  	OnConnect func(ctx context.Context, cn *Conn) error
    61  
    62  	Protocol int
    63  	Username string
    64  	Password string
    65  	// CredentialsProvider allows the username and password to be updated
    66  	// before reconnecting. It should return the current username and password.
    67  	CredentialsProvider func() (username string, password string)
    68  
    69  	// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
    70  	// done to maintain API compatibility. In the future,
    71  	// there might be a merge between CredentialsProviderContext and CredentialsProvider.
    72  	// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
    73  	CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
    74  
    75  	// StreamingCredentialsProvider is used to retrieve the credentials
    76  	// for the connection from an external source. Those credentials may change
    77  	// during the connection lifetime. This is useful for managed identity
    78  	// scenarios where the credentials are retrieved from an external source.
    79  	//
    80  	// Currently, this is a placeholder for the future implementation.
    81  	StreamingCredentialsProvider auth.StreamingCredentialsProvider
    82  	DB                           int
    83  
    84  	MaxRetries      int
    85  	MinRetryBackoff time.Duration
    86  	MaxRetryBackoff time.Duration
    87  
    88  	DialTimeout           time.Duration
    89  	ReadTimeout           time.Duration
    90  	WriteTimeout          time.Duration
    91  	ContextTimeoutEnabled bool
    92  
    93  	// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
    94  	// Larger buffers can improve performance for commands that return large responses.
    95  	// Smaller buffers can improve memory usage for larger pools.
    96  	//
    97  	// default: 32KiB (32768 bytes)
    98  	ReadBufferSize int
    99  
   100  	// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
   101  	// Larger buffers can improve performance for large pipelines and commands with many arguments.
   102  	// Smaller buffers can improve memory usage for larger pools.
   103  	//
   104  	// default: 32KiB (32768 bytes)
   105  	WriteBufferSize int
   106  
   107  	PoolFIFO bool
   108  
   109  	PoolSize        int
   110  	PoolTimeout     time.Duration
   111  	MinIdleConns    int
   112  	MaxIdleConns    int
   113  	MaxActiveConns  int
   114  	ConnMaxIdleTime time.Duration
   115  	ConnMaxLifetime time.Duration
   116  
   117  	TLSConfig *tls.Config
   118  
   119  	// DisableIndentity - Disable set-lib on connect.
   120  	//
   121  	// default: false
   122  	//
   123  	// Deprecated: Use DisableIdentity instead.
   124  	DisableIndentity bool
   125  
   126  	// DisableIdentity is used to disable CLIENT SETINFO command on connect.
   127  	//
   128  	// default: false
   129  	DisableIdentity bool
   130  
   131  	IdentitySuffix string
   132  
   133  	// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
   134  	// When a node is marked as failing, it will be avoided for this duration.
   135  	// Only applies to failover cluster clients. Default is 15 seconds.
   136  	FailingTimeoutSeconds int
   137  
   138  	UnstableResp3 bool
   139  }
   140  
   141  func (opt *FailoverOptions) clientOptions() *Options {
   142  	return &Options{
   143  		Addr:       "FailoverClient",
   144  		ClientName: opt.ClientName,
   145  
   146  		Dialer:    opt.Dialer,
   147  		OnConnect: opt.OnConnect,
   148  
   149  		DB:                           opt.DB,
   150  		Protocol:                     opt.Protocol,
   151  		Username:                     opt.Username,
   152  		Password:                     opt.Password,
   153  		CredentialsProvider:          opt.CredentialsProvider,
   154  		CredentialsProviderContext:   opt.CredentialsProviderContext,
   155  		StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
   156  
   157  		MaxRetries:      opt.MaxRetries,
   158  		MinRetryBackoff: opt.MinRetryBackoff,
   159  		MaxRetryBackoff: opt.MaxRetryBackoff,
   160  
   161  		ReadBufferSize:  opt.ReadBufferSize,
   162  		WriteBufferSize: opt.WriteBufferSize,
   163  
   164  		DialTimeout:           opt.DialTimeout,
   165  		ReadTimeout:           opt.ReadTimeout,
   166  		WriteTimeout:          opt.WriteTimeout,
   167  		ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
   168  
   169  		PoolFIFO:        opt.PoolFIFO,
   170  		PoolSize:        opt.PoolSize,
   171  		PoolTimeout:     opt.PoolTimeout,
   172  		MinIdleConns:    opt.MinIdleConns,
   173  		MaxIdleConns:    opt.MaxIdleConns,
   174  		MaxActiveConns:  opt.MaxActiveConns,
   175  		ConnMaxIdleTime: opt.ConnMaxIdleTime,
   176  		ConnMaxLifetime: opt.ConnMaxLifetime,
   177  
   178  		TLSConfig: opt.TLSConfig,
   179  
   180  		DisableIdentity:  opt.DisableIdentity,
   181  		DisableIndentity: opt.DisableIndentity,
   182  
   183  		IdentitySuffix: opt.IdentitySuffix,
   184  		UnstableResp3:  opt.UnstableResp3,
   185  	}
   186  }
   187  
   188  func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
   189  	return &Options{
   190  		Addr:       addr,
   191  		ClientName: opt.ClientName,
   192  
   193  		Dialer:    opt.Dialer,
   194  		OnConnect: opt.OnConnect,
   195  
   196  		DB:       0,
   197  		Username: opt.SentinelUsername,
   198  		Password: opt.SentinelPassword,
   199  
   200  		MaxRetries:      opt.MaxRetries,
   201  		MinRetryBackoff: opt.MinRetryBackoff,
   202  		MaxRetryBackoff: opt.MaxRetryBackoff,
   203  
   204  		// The sentinel client uses a 4KiB read/write buffer size.
   205  		ReadBufferSize:  4096,
   206  		WriteBufferSize: 4096,
   207  
   208  		DialTimeout:           opt.DialTimeout,
   209  		ReadTimeout:           opt.ReadTimeout,
   210  		WriteTimeout:          opt.WriteTimeout,
   211  		ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
   212  
   213  		PoolFIFO:        opt.PoolFIFO,
   214  		PoolSize:        opt.PoolSize,
   215  		PoolTimeout:     opt.PoolTimeout,
   216  		MinIdleConns:    opt.MinIdleConns,
   217  		MaxIdleConns:    opt.MaxIdleConns,
   218  		MaxActiveConns:  opt.MaxActiveConns,
   219  		ConnMaxIdleTime: opt.ConnMaxIdleTime,
   220  		ConnMaxLifetime: opt.ConnMaxLifetime,
   221  
   222  		TLSConfig: opt.TLSConfig,
   223  
   224  		DisableIdentity:  opt.DisableIdentity,
   225  		DisableIndentity: opt.DisableIndentity,
   226  
   227  		IdentitySuffix: opt.IdentitySuffix,
   228  		UnstableResp3:  opt.UnstableResp3,
   229  	}
   230  }
   231  
   232  func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
   233  	return &ClusterOptions{
   234  		ClientName: opt.ClientName,
   235  
   236  		Dialer:    opt.Dialer,
   237  		OnConnect: opt.OnConnect,
   238  
   239  		Protocol:                     opt.Protocol,
   240  		Username:                     opt.Username,
   241  		Password:                     opt.Password,
   242  		CredentialsProvider:          opt.CredentialsProvider,
   243  		CredentialsProviderContext:   opt.CredentialsProviderContext,
   244  		StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
   245  
   246  		MaxRedirects: opt.MaxRetries,
   247  
   248  		ReadOnly:       opt.ReplicaOnly,
   249  		RouteByLatency: opt.RouteByLatency,
   250  		RouteRandomly:  opt.RouteRandomly,
   251  
   252  		MinRetryBackoff: opt.MinRetryBackoff,
   253  		MaxRetryBackoff: opt.MaxRetryBackoff,
   254  
   255  		ReadBufferSize:  opt.ReadBufferSize,
   256  		WriteBufferSize: opt.WriteBufferSize,
   257  
   258  		DialTimeout:           opt.DialTimeout,
   259  		ReadTimeout:           opt.ReadTimeout,
   260  		WriteTimeout:          opt.WriteTimeout,
   261  		ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
   262  
   263  		PoolFIFO:        opt.PoolFIFO,
   264  		PoolSize:        opt.PoolSize,
   265  		PoolTimeout:     opt.PoolTimeout,
   266  		MinIdleConns:    opt.MinIdleConns,
   267  		MaxIdleConns:    opt.MaxIdleConns,
   268  		MaxActiveConns:  opt.MaxActiveConns,
   269  		ConnMaxIdleTime: opt.ConnMaxIdleTime,
   270  		ConnMaxLifetime: opt.ConnMaxLifetime,
   271  
   272  		TLSConfig: opt.TLSConfig,
   273  
   274  		DisableIdentity:       opt.DisableIdentity,
   275  		DisableIndentity:      opt.DisableIndentity,
   276  		IdentitySuffix:        opt.IdentitySuffix,
   277  		FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
   278  	}
   279  }
   280  
   281  // ParseFailoverURL parses a URL into FailoverOptions that can be used to connect to Redis.
   282  // The URL must be in the form:
   283  //
   284  //	redis://<user>:<password>@<host>:<port>/<db_number>
   285  //	or
   286  //	rediss://<user>:<password>@<host>:<port>/<db_number>
   287  //
   288  // To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
   289  //
   290  //	redis://<user>:<password>@<host>:<port>/<db_number>?addr=<host2>:<port2>&addr=<host3>:<port3>
   291  //	or
   292  //	rediss://<user>:<password>@<host>:<port>/<db_number>?addr=<host2>:<port2>&addr=<host3>:<port3>
   293  //
   294  // Most Option fields can be set using query parameters, with the following restrictions:
   295  //   - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
   296  //   - only scalar type fields are supported (bool, int, time.Duration)
   297  //   - for time.Duration fields, values must be a valid input for time.ParseDuration();
   298  //     additionally a plain integer as value (i.e. without unit) is interpreted as seconds
   299  //   - to disable a duration field, use value less than or equal to 0; to use the default
   300  //     value, leave the value blank or remove the parameter
   301  //   - only the last value is interpreted if a parameter is given multiple times
   302  //   - fields "network", "addr", "sentinel_username" and "sentinel_password" can only be set using other
   303  //     URL attributes (scheme, host, userinfo, resp.), query parameters using these
   304  //     names will be treated as unknown parameters
   305  //   - unknown parameter names will result in an error
   306  //   - use "skip_verify=true" to ignore TLS certificate validation
   307  //
   308  // Example:
   309  //
   310  //	redis://user:password@localhost:6789?master_name=mymaster&dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
   311  //	is equivalent to:
   312  //	&FailoverOptions{
   313  //		MasterName:  "mymaster",
   314  //		Addr:        ["localhost:6789", "localhost:6790", "localhost:6791"]
   315  //		DialTimeout: 3 * time.Second, // no time unit = seconds
   316  //		ReadTimeout: 6 * time.Second,
   317  //	}
   318  func ParseFailoverURL(redisURL string) (*FailoverOptions, error) {
   319  	u, err := url.Parse(redisURL)
   320  	if err != nil {
   321  		return nil, err
   322  	}
   323  	return setupFailoverConn(u)
   324  }
   325  
   326  func setupFailoverConn(u *url.URL) (*FailoverOptions, error) {
   327  	o := &FailoverOptions{}
   328  
   329  	o.SentinelUsername, o.SentinelPassword = getUserPassword(u)
   330  
   331  	h, p := getHostPortWithDefaults(u)
   332  	o.SentinelAddrs = append(o.SentinelAddrs, net.JoinHostPort(h, p))
   333  
   334  	switch u.Scheme {
   335  	case "rediss":
   336  		o.TLSConfig = &tls.Config{ServerName: h, MinVersion: tls.VersionTLS12}
   337  	case "redis":
   338  		o.TLSConfig = nil
   339  	default:
   340  		return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
   341  	}
   342  
   343  	f := strings.FieldsFunc(u.Path, func(r rune) bool {
   344  		return r == '/'
   345  	})
   346  	switch len(f) {
   347  	case 0:
   348  		o.DB = 0
   349  	case 1:
   350  		var err error
   351  		if o.DB, err = strconv.Atoi(f[0]); err != nil {
   352  			return nil, fmt.Errorf("redis: invalid database number: %q", f[0])
   353  		}
   354  	default:
   355  		return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path)
   356  	}
   357  
   358  	return setupFailoverConnParams(u, o)
   359  }
   360  
   361  func setupFailoverConnParams(u *url.URL, o *FailoverOptions) (*FailoverOptions, error) {
   362  	q := queryOptions{q: u.Query()}
   363  
   364  	o.MasterName = q.string("master_name")
   365  	o.ClientName = q.string("client_name")
   366  	o.RouteByLatency = q.bool("route_by_latency")
   367  	o.RouteRandomly = q.bool("route_randomly")
   368  	o.ReplicaOnly = q.bool("replica_only")
   369  	o.UseDisconnectedReplicas = q.bool("use_disconnected_replicas")
   370  	o.Protocol = q.int("protocol")
   371  	o.Username = q.string("username")
   372  	o.Password = q.string("password")
   373  	o.MaxRetries = q.int("max_retries")
   374  	o.MinRetryBackoff = q.duration("min_retry_backoff")
   375  	o.MaxRetryBackoff = q.duration("max_retry_backoff")
   376  	o.DialTimeout = q.duration("dial_timeout")
   377  	o.ReadTimeout = q.duration("read_timeout")
   378  	o.WriteTimeout = q.duration("write_timeout")
   379  	o.ContextTimeoutEnabled = q.bool("context_timeout_enabled")
   380  	o.PoolFIFO = q.bool("pool_fifo")
   381  	o.PoolSize = q.int("pool_size")
   382  	o.MinIdleConns = q.int("min_idle_conns")
   383  	o.MaxIdleConns = q.int("max_idle_conns")
   384  	o.MaxActiveConns = q.int("max_active_conns")
   385  	o.ConnMaxLifetime = q.duration("conn_max_lifetime")
   386  	o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
   387  	o.PoolTimeout = q.duration("pool_timeout")
   388  	o.DisableIdentity = q.bool("disableIdentity")
   389  	o.IdentitySuffix = q.string("identitySuffix")
   390  	o.UnstableResp3 = q.bool("unstable_resp3")
   391  
   392  	if q.err != nil {
   393  		return nil, q.err
   394  	}
   395  
   396  	if tmp := q.string("db"); tmp != "" {
   397  		db, err := strconv.Atoi(tmp)
   398  		if err != nil {
   399  			return nil, fmt.Errorf("redis: invalid database number: %w", err)
   400  		}
   401  		o.DB = db
   402  	}
   403  
   404  	addrs := q.strings("addr")
   405  	for _, addr := range addrs {
   406  		h, p, err := net.SplitHostPort(addr)
   407  		if err != nil || h == "" || p == "" {
   408  			return nil, fmt.Errorf("redis: unable to parse addr param: %s", addr)
   409  		}
   410  
   411  		o.SentinelAddrs = append(o.SentinelAddrs, net.JoinHostPort(h, p))
   412  	}
   413  
   414  	if o.TLSConfig != nil && q.has("skip_verify") {
   415  		o.TLSConfig.InsecureSkipVerify = q.bool("skip_verify")
   416  	}
   417  
   418  	// any parameters left?
   419  	if r := q.remaining(); len(r) > 0 {
   420  		return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
   421  	}
   422  
   423  	return o, nil
   424  }
   425  
   426  // NewFailoverClient returns a Redis client that uses Redis Sentinel
   427  // for automatic failover. It's safe for concurrent use by multiple
   428  // goroutines.
   429  func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
   430  	if failoverOpt == nil {
   431  		panic("redis: NewFailoverClient nil options")
   432  	}
   433  
   434  	if failoverOpt.RouteByLatency {
   435  		panic("to route commands by latency, use NewFailoverClusterClient")
   436  	}
   437  	if failoverOpt.RouteRandomly {
   438  		panic("to route commands randomly, use NewFailoverClusterClient")
   439  	}
   440  
   441  	sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
   442  	copy(sentinelAddrs, failoverOpt.SentinelAddrs)
   443  
   444  	rand.Shuffle(len(sentinelAddrs), func(i, j int) {
   445  		sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]
   446  	})
   447  
   448  	failover := &sentinelFailover{
   449  		opt:           failoverOpt,
   450  		sentinelAddrs: sentinelAddrs,
   451  	}
   452  
   453  	opt := failoverOpt.clientOptions()
   454  	opt.Dialer = masterReplicaDialer(failover)
   455  	opt.init()
   456  
   457  	var connPool *pool.ConnPool
   458  
   459  	rdb := &Client{
   460  		baseClient: &baseClient{
   461  			opt: opt,
   462  		},
   463  	}
   464  	rdb.init()
   465  
   466  	connPool = newConnPool(opt, rdb.dialHook)
   467  	rdb.connPool = connPool
   468  	rdb.onClose = rdb.wrappedOnClose(failover.Close)
   469  
   470  	failover.mu.Lock()
   471  	failover.onFailover = func(ctx context.Context, addr string) {
   472  		_ = connPool.Filter(func(cn *pool.Conn) bool {
   473  			return cn.RemoteAddr().String() != addr
   474  		})
   475  	}
   476  	failover.mu.Unlock()
   477  
   478  	return rdb
   479  }
   480  
   481  func masterReplicaDialer(
   482  	failover *sentinelFailover,
   483  ) func(ctx context.Context, network, addr string) (net.Conn, error) {
   484  	return func(ctx context.Context, network, _ string) (net.Conn, error) {
   485  		var addr string
   486  		var err error
   487  
   488  		if failover.opt.ReplicaOnly {
   489  			addr, err = failover.RandomReplicaAddr(ctx)
   490  		} else {
   491  			addr, err = failover.MasterAddr(ctx)
   492  			if err == nil {
   493  				failover.trySwitchMaster(ctx, addr)
   494  			}
   495  		}
   496  		if err != nil {
   497  			return nil, err
   498  		}
   499  		if failover.opt.Dialer != nil {
   500  			return failover.opt.Dialer(ctx, network, addr)
   501  		}
   502  
   503  		netDialer := &net.Dialer{
   504  			Timeout:   failover.opt.DialTimeout,
   505  			KeepAlive: 5 * time.Minute,
   506  		}
   507  		if failover.opt.TLSConfig == nil {
   508  			return netDialer.DialContext(ctx, network, addr)
   509  		}
   510  		return tls.DialWithDialer(netDialer, network, addr, failover.opt.TLSConfig)
   511  	}
   512  }
   513  
   514  //------------------------------------------------------------------------------
   515  
   516  // SentinelClient is a client for a Redis Sentinel.
   517  type SentinelClient struct {
   518  	*baseClient
   519  }
   520  
   521  func NewSentinelClient(opt *Options) *SentinelClient {
   522  	if opt == nil {
   523  		panic("redis: NewSentinelClient nil options")
   524  	}
   525  	opt.init()
   526  	c := &SentinelClient{
   527  		baseClient: &baseClient{
   528  			opt: opt,
   529  		},
   530  	}
   531  
   532  	c.initHooks(hooks{
   533  		dial:    c.baseClient.dial,
   534  		process: c.baseClient.process,
   535  	})
   536  	c.connPool = newConnPool(opt, c.dialHook)
   537  
   538  	return c
   539  }
   540  
   541  func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
   542  	err := c.processHook(ctx, cmd)
   543  	cmd.SetErr(err)
   544  	return err
   545  }
   546  
   547  func (c *SentinelClient) pubSub() *PubSub {
   548  	pubsub := &PubSub{
   549  		opt: c.opt,
   550  
   551  		newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
   552  			return c.newConn(ctx)
   553  		},
   554  		closeConn: c.connPool.CloseConn,
   555  	}
   556  	pubsub.init()
   557  	return pubsub
   558  }
   559  
   560  // Ping is used to test if a connection is still alive, or to
   561  // measure latency.
   562  func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
   563  	cmd := NewStringCmd(ctx, "ping")
   564  	_ = c.Process(ctx, cmd)
   565  	return cmd
   566  }
   567  
   568  // Subscribe subscribes the client to the specified channels.
   569  // Channels can be omitted to create empty subscription.
   570  func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
   571  	pubsub := c.pubSub()
   572  	if len(channels) > 0 {
   573  		_ = pubsub.Subscribe(ctx, channels...)
   574  	}
   575  	return pubsub
   576  }
   577  
   578  // PSubscribe subscribes the client to the given patterns.
   579  // Patterns can be omitted to create empty subscription.
   580  func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
   581  	pubsub := c.pubSub()
   582  	if len(channels) > 0 {
   583  		_ = pubsub.PSubscribe(ctx, channels...)
   584  	}
   585  	return pubsub
   586  }
   587  
   588  func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
   589  	cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
   590  	_ = c.Process(ctx, cmd)
   591  	return cmd
   592  }
   593  
   594  func (c *SentinelClient) Sentinels(ctx context.Context, name string) *MapStringStringSliceCmd {
   595  	cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "sentinels", name)
   596  	_ = c.Process(ctx, cmd)
   597  	return cmd
   598  }
   599  
   600  // Failover forces a failover as if the master was not reachable, and without
   601  // asking for agreement to other Sentinels.
   602  func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
   603  	cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
   604  	_ = c.Process(ctx, cmd)
   605  	return cmd
   606  }
   607  
   608  // Reset resets all the masters with matching name. The pattern argument is a
   609  // glob-style pattern. The reset process clears any previous state in a master
   610  // (including a failover in progress), and removes every replica and sentinel
   611  // already discovered and associated with the master.
   612  func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
   613  	cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
   614  	_ = c.Process(ctx, cmd)
   615  	return cmd
   616  }
   617  
   618  // FlushConfig forces Sentinel to rewrite its configuration on disk, including
   619  // the current Sentinel state.
   620  func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
   621  	cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
   622  	_ = c.Process(ctx, cmd)
   623  	return cmd
   624  }
   625  
   626  // Master shows the state and info of the specified master.
   627  func (c *SentinelClient) Master(ctx context.Context, name string) *MapStringStringCmd {
   628  	cmd := NewMapStringStringCmd(ctx, "sentinel", "master", name)
   629  	_ = c.Process(ctx, cmd)
   630  	return cmd
   631  }
   632  
   633  // Masters shows a list of monitored masters and their state.
   634  func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
   635  	cmd := NewSliceCmd(ctx, "sentinel", "masters")
   636  	_ = c.Process(ctx, cmd)
   637  	return cmd
   638  }
   639  
   640  // Replicas shows a list of replicas for the specified master and their state.
   641  func (c *SentinelClient) Replicas(ctx context.Context, name string) *MapStringStringSliceCmd {
   642  	cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "replicas", name)
   643  	_ = c.Process(ctx, cmd)
   644  	return cmd
   645  }
   646  
   647  // CkQuorum checks if the current Sentinel configuration is able to reach the
   648  // quorum needed to failover a master, and the majority needed to authorize the
   649  // failover. This command should be used in monitoring systems to check if a
   650  // Sentinel deployment is ok.
   651  func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
   652  	cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
   653  	_ = c.Process(ctx, cmd)
   654  	return cmd
   655  }
   656  
   657  // Monitor tells the Sentinel to start monitoring a new master with the specified
   658  // name, ip, port, and quorum.
   659  func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
   660  	cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
   661  	_ = c.Process(ctx, cmd)
   662  	return cmd
   663  }
   664  
   665  // Set is used in order to change configuration parameters of a specific master.
   666  func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
   667  	cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
   668  	_ = c.Process(ctx, cmd)
   669  	return cmd
   670  }
   671  
   672  // Remove is used in order to remove the specified master: the master will no
   673  // longer be monitored, and will totally be removed from the internal state of
   674  // the Sentinel.
   675  func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
   676  	cmd := NewStringCmd(ctx, "sentinel", "remove", name)
   677  	_ = c.Process(ctx, cmd)
   678  	return cmd
   679  }
   680  
   681  //------------------------------------------------------------------------------
   682  
   683  type sentinelFailover struct {
   684  	opt *FailoverOptions
   685  
   686  	sentinelAddrs []string
   687  
   688  	onFailover func(ctx context.Context, addr string)
   689  	onUpdate   func(ctx context.Context)
   690  
   691  	mu         sync.RWMutex
   692  	masterAddr string
   693  	sentinel   *SentinelClient
   694  	pubsub     *PubSub
   695  }
   696  
   697  func (c *sentinelFailover) Close() error {
   698  	c.mu.Lock()
   699  	defer c.mu.Unlock()
   700  	if c.sentinel != nil {
   701  		return c.closeSentinel()
   702  	}
   703  	return nil
   704  }
   705  
   706  func (c *sentinelFailover) closeSentinel() error {
   707  	firstErr := c.pubsub.Close()
   708  	c.pubsub = nil
   709  
   710  	err := c.sentinel.Close()
   711  	if err != nil && firstErr == nil {
   712  		firstErr = err
   713  	}
   714  	c.sentinel = nil
   715  
   716  	return firstErr
   717  }
   718  
   719  func (c *sentinelFailover) RandomReplicaAddr(ctx context.Context) (string, error) {
   720  	if c.opt == nil {
   721  		return "", errors.New("opt is nil")
   722  	}
   723  
   724  	addresses, err := c.replicaAddrs(ctx, false)
   725  	if err != nil {
   726  		return "", err
   727  	}
   728  
   729  	if len(addresses) == 0 && c.opt.UseDisconnectedReplicas {
   730  		addresses, err = c.replicaAddrs(ctx, true)
   731  		if err != nil {
   732  			return "", err
   733  		}
   734  	}
   735  
   736  	if len(addresses) == 0 {
   737  		return c.MasterAddr(ctx)
   738  	}
   739  	return addresses[rand.Intn(len(addresses))], nil
   740  }
   741  
   742  func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
   743  	c.mu.RLock()
   744  	sentinel := c.sentinel
   745  	c.mu.RUnlock()
   746  
   747  	if sentinel != nil {
   748  		addr, err := c.getMasterAddr(ctx, sentinel)
   749  		if err != nil {
   750  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   751  				return "", err
   752  			}
   753  			// Continue on other errors
   754  			internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
   755  				c.opt.MasterName, err)
   756  		} else {
   757  			return addr, nil
   758  		}
   759  	}
   760  
   761  	c.mu.Lock()
   762  	defer c.mu.Unlock()
   763  
   764  	if c.sentinel != nil {
   765  		addr, err := c.getMasterAddr(ctx, c.sentinel)
   766  		if err != nil {
   767  			_ = c.closeSentinel()
   768  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   769  				return "", err
   770  			}
   771  			// Continue on other errors
   772  			internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
   773  				c.opt.MasterName, err)
   774  		} else {
   775  			return addr, nil
   776  		}
   777  	}
   778  
   779  	var (
   780  		masterAddr string
   781  		wg         sync.WaitGroup
   782  		once       sync.Once
   783  		errCh      = make(chan error, len(c.sentinelAddrs))
   784  	)
   785  
   786  	ctx, cancel := context.WithCancel(ctx)
   787  	defer cancel()
   788  
   789  	for i, sentinelAddr := range c.sentinelAddrs {
   790  		wg.Add(1)
   791  		go func(i int, addr string) {
   792  			defer wg.Done()
   793  			sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
   794  			addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
   795  			if err != nil {
   796  				internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
   797  					addr, c.opt.MasterName, err)
   798  				_ = sentinelCli.Close()
   799  				errCh <- err
   800  				return
   801  			}
   802  			once.Do(func() {
   803  				masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
   804  				// Push working sentinel to the top
   805  				c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
   806  				c.setSentinel(ctx, sentinelCli)
   807  				internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
   808  				cancel()
   809  			})
   810  		}(i, sentinelAddr)
   811  	}
   812  
   813  	wg.Wait()
   814  	close(errCh)
   815  	if masterAddr != "" {
   816  		return masterAddr, nil
   817  	}
   818  	errs := make([]error, 0, len(errCh))
   819  	for err := range errCh {
   820  		errs = append(errs, err)
   821  	}
   822  	return "", fmt.Errorf("redis: all sentinels specified in configuration are unreachable: %s", joinErrors(errs))
   823  }
   824  
   825  func joinErrors(errs []error) string {
   826  	if len(errs) == 1 {
   827  		return errs[0].Error()
   828  	}
   829  
   830  	b := []byte(errs[0].Error())
   831  	for _, err := range errs[1:] {
   832  		b = append(b, '\n')
   833  		b = append(b, err.Error()...)
   834  	}
   835  	return util.BytesToString(b)
   836  }
   837  
   838  func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
   839  	c.mu.RLock()
   840  	sentinel := c.sentinel
   841  	c.mu.RUnlock()
   842  
   843  	if sentinel != nil {
   844  		addrs, err := c.getReplicaAddrs(ctx, sentinel)
   845  		if err != nil {
   846  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   847  				return nil, err
   848  			}
   849  			// Continue on other errors
   850  			internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
   851  				c.opt.MasterName, err)
   852  		} else if len(addrs) > 0 {
   853  			return addrs, nil
   854  		}
   855  	}
   856  
   857  	c.mu.Lock()
   858  	defer c.mu.Unlock()
   859  
   860  	if c.sentinel != nil {
   861  		addrs, err := c.getReplicaAddrs(ctx, c.sentinel)
   862  		if err != nil {
   863  			_ = c.closeSentinel()
   864  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   865  				return nil, err
   866  			}
   867  			// Continue on other errors
   868  			internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
   869  				c.opt.MasterName, err)
   870  		} else if len(addrs) > 0 {
   871  			return addrs, nil
   872  		} else {
   873  			// No error and no replicas.
   874  			_ = c.closeSentinel()
   875  		}
   876  	}
   877  
   878  	var sentinelReachable bool
   879  
   880  	for i, sentinelAddr := range c.sentinelAddrs {
   881  		sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
   882  
   883  		replicas, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
   884  		if err != nil {
   885  			_ = sentinel.Close()
   886  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   887  				return nil, err
   888  			}
   889  			internal.Logger.Printf(ctx, "sentinel: Replicas master=%q failed: %s",
   890  				c.opt.MasterName, err)
   891  			continue
   892  		}
   893  		sentinelReachable = true
   894  		addrs := parseReplicaAddrs(replicas, useDisconnected)
   895  		if len(addrs) == 0 {
   896  			continue
   897  		}
   898  		// Push working sentinel to the top.
   899  		c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
   900  		c.setSentinel(ctx, sentinel)
   901  
   902  		return addrs, nil
   903  	}
   904  
   905  	if sentinelReachable {
   906  		return []string{}, nil
   907  	}
   908  	return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
   909  }
   910  
   911  func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) (string, error) {
   912  	addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
   913  	if err != nil {
   914  		return "", err
   915  	}
   916  	return net.JoinHostPort(addr[0], addr[1]), nil
   917  }
   918  
   919  func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *SentinelClient) ([]string, error) {
   920  	addrs, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
   921  	if err != nil {
   922  		internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
   923  			c.opt.MasterName, err)
   924  		return nil, err
   925  	}
   926  	return parseReplicaAddrs(addrs, false), nil
   927  }
   928  
   929  func parseReplicaAddrs(addrs []map[string]string, keepDisconnected bool) []string {
   930  	nodes := make([]string, 0, len(addrs))
   931  	for _, node := range addrs {
   932  		isDown := false
   933  		if flags, ok := node["flags"]; ok {
   934  			for _, flag := range strings.Split(flags, ",") {
   935  				switch flag {
   936  				case "s_down", "o_down":
   937  					isDown = true
   938  				case "disconnected":
   939  					if !keepDisconnected {
   940  						isDown = true
   941  					}
   942  				}
   943  			}
   944  		}
   945  		if !isDown && node["ip"] != "" && node["port"] != "" {
   946  			nodes = append(nodes, net.JoinHostPort(node["ip"], node["port"]))
   947  		}
   948  	}
   949  
   950  	return nodes
   951  }
   952  
   953  func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
   954  	c.mu.RLock()
   955  	currentAddr := c.masterAddr //nolint:ifshort
   956  	c.mu.RUnlock()
   957  
   958  	if addr == currentAddr {
   959  		return
   960  	}
   961  
   962  	c.mu.Lock()
   963  	defer c.mu.Unlock()
   964  
   965  	if addr == c.masterAddr {
   966  		return
   967  	}
   968  	c.masterAddr = addr
   969  
   970  	internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
   971  		c.opt.MasterName, addr)
   972  	if c.onFailover != nil {
   973  		c.onFailover(ctx, addr)
   974  	}
   975  }
   976  
   977  func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
   978  	if c.sentinel != nil {
   979  		panic("not reached")
   980  	}
   981  	c.sentinel = sentinel
   982  	c.discoverSentinels(ctx)
   983  
   984  	c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+replica-reconf-done")
   985  	go c.listen(c.pubsub)
   986  }
   987  
   988  func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
   989  	sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
   990  	if err != nil {
   991  		internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
   992  		return
   993  	}
   994  	for _, sentinel := range sentinels {
   995  		ip, ok := sentinel["ip"]
   996  		if !ok {
   997  			continue
   998  		}
   999  		port, ok := sentinel["port"]
  1000  		if !ok {
  1001  			continue
  1002  		}
  1003  		if ip != "" && port != "" {
  1004  			sentinelAddr := net.JoinHostPort(ip, port)
  1005  			if !contains(c.sentinelAddrs, sentinelAddr) {
  1006  				internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
  1007  					sentinelAddr, c.opt.MasterName)
  1008  				c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
  1009  			}
  1010  		}
  1011  	}
  1012  }
  1013  
  1014  func (c *sentinelFailover) listen(pubsub *PubSub) {
  1015  	ctx := context.TODO()
  1016  
  1017  	if c.onUpdate != nil {
  1018  		c.onUpdate(ctx)
  1019  	}
  1020  
  1021  	ch := pubsub.Channel()
  1022  	for msg := range ch {
  1023  		if msg.Channel == "+switch-master" {
  1024  			parts := strings.Split(msg.Payload, " ")
  1025  			if parts[0] != c.opt.MasterName {
  1026  				internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
  1027  				continue
  1028  			}
  1029  			addr := net.JoinHostPort(parts[3], parts[4])
  1030  			c.trySwitchMaster(pubsub.getContext(), addr)
  1031  		}
  1032  
  1033  		if c.onUpdate != nil {
  1034  			c.onUpdate(ctx)
  1035  		}
  1036  	}
  1037  }
  1038  
  1039  func contains(slice []string, str string) bool {
  1040  	for _, s := range slice {
  1041  		if s == str {
  1042  			return true
  1043  		}
  1044  	}
  1045  	return false
  1046  }
  1047  
  1048  //------------------------------------------------------------------------------
  1049  
  1050  // NewFailoverClusterClient returns a client that supports routing read-only commands
  1051  // to a replica node.
  1052  func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
  1053  	if failoverOpt == nil {
  1054  		panic("redis: NewFailoverClusterClient nil options")
  1055  	}
  1056  
  1057  	sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
  1058  	copy(sentinelAddrs, failoverOpt.SentinelAddrs)
  1059  
  1060  	failover := &sentinelFailover{
  1061  		opt:           failoverOpt,
  1062  		sentinelAddrs: sentinelAddrs,
  1063  	}
  1064  
  1065  	opt := failoverOpt.clusterOptions()
  1066  	if failoverOpt.DB != 0 {
  1067  		onConnect := opt.OnConnect
  1068  
  1069  		opt.OnConnect = func(ctx context.Context, cn *Conn) error {
  1070  			if err := cn.Select(ctx, failoverOpt.DB).Err(); err != nil {
  1071  				return err
  1072  			}
  1073  
  1074  			if onConnect != nil {
  1075  				return onConnect(ctx, cn)
  1076  			}
  1077  
  1078  			return nil
  1079  		}
  1080  	}
  1081  
  1082  	opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
  1083  		masterAddr, err := failover.MasterAddr(ctx)
  1084  		if err != nil {
  1085  			return nil, err
  1086  		}
  1087  
  1088  		nodes := []ClusterNode{{
  1089  			Addr: masterAddr,
  1090  		}}
  1091  
  1092  		replicaAddrs, err := failover.replicaAddrs(ctx, false)
  1093  		if err != nil {
  1094  			return nil, err
  1095  		}
  1096  
  1097  		for _, replicaAddr := range replicaAddrs {
  1098  			nodes = append(nodes, ClusterNode{
  1099  				Addr: replicaAddr,
  1100  			})
  1101  		}
  1102  
  1103  		slots := []ClusterSlot{
  1104  			{
  1105  				Start: 0,
  1106  				End:   16383,
  1107  				Nodes: nodes,
  1108  			},
  1109  		}
  1110  		return slots, nil
  1111  	}
  1112  
  1113  	c := NewClusterClient(opt)
  1114  
  1115  	failover.mu.Lock()
  1116  	failover.onUpdate = func(ctx context.Context) {
  1117  		c.ReloadState(ctx)
  1118  	}
  1119  	failover.mu.Unlock()
  1120  
  1121  	return c
  1122  }
  1123  

View as plain text