...

Source file src/github.com/redis/go-redis/v9/universal.go

Documentation: github.com/redis/go-redis/v9

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"net"
     7  	"time"
     8  
     9  	"github.com/redis/go-redis/v9/auth"
    10  )
    11  
    12  // UniversalOptions information is required by UniversalClient to establish
    13  // connections.
    14  type UniversalOptions struct {
    15  	// Either a single address or a seed list of host:port addresses
    16  	// of cluster/sentinel nodes.
    17  	Addrs []string
    18  
    19  	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
    20  	ClientName string
    21  
    22  	// Database to be selected after connecting to the server.
    23  	// Only single-node and failover clients.
    24  	DB int
    25  
    26  	// Common options.
    27  
    28  	Dialer    func(ctx context.Context, network, addr string) (net.Conn, error)
    29  	OnConnect func(ctx context.Context, cn *Conn) error
    30  
    31  	Protocol int
    32  	Username string
    33  	Password string
    34  	// CredentialsProvider allows the username and password to be updated
    35  	// before reconnecting. It should return the current username and password.
    36  	CredentialsProvider func() (username string, password string)
    37  
    38  	// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
    39  	// done to maintain API compatibility. In the future,
    40  	// there might be a merge between CredentialsProviderContext and CredentialsProvider.
    41  	// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
    42  	CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
    43  
    44  	// StreamingCredentialsProvider is used to retrieve the credentials
    45  	// for the connection from an external source. Those credentials may change
    46  	// during the connection lifetime. This is useful for managed identity
    47  	// scenarios where the credentials are retrieved from an external source.
    48  	//
    49  	// Currently, this is a placeholder for the future implementation.
    50  	StreamingCredentialsProvider auth.StreamingCredentialsProvider
    51  
    52  	SentinelUsername string
    53  	SentinelPassword string
    54  
    55  	MaxRetries      int
    56  	MinRetryBackoff time.Duration
    57  	MaxRetryBackoff time.Duration
    58  
    59  	DialTimeout           time.Duration
    60  	ReadTimeout           time.Duration
    61  	WriteTimeout          time.Duration
    62  	ContextTimeoutEnabled bool
    63  
    64  	// ReadBufferSize is the size of the bufio.Reader buffer for each connection.
    65  	// Larger buffers can improve performance for commands that return large responses.
    66  	// Smaller buffers can improve memory usage for larger pools.
    67  	//
    68  	// default: 32KiB (32768 bytes)
    69  	ReadBufferSize int
    70  
    71  	// WriteBufferSize is the size of the bufio.Writer buffer for each connection.
    72  	// Larger buffers can improve performance for large pipelines and commands with many arguments.
    73  	// Smaller buffers can improve memory usage for larger pools.
    74  	//
    75  	// default: 32KiB (32768 bytes)
    76  	WriteBufferSize int
    77  
    78  	// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
    79  	PoolFIFO bool
    80  
    81  	PoolSize        int
    82  	PoolTimeout     time.Duration
    83  	MinIdleConns    int
    84  	MaxIdleConns    int
    85  	MaxActiveConns  int
    86  	ConnMaxIdleTime time.Duration
    87  	ConnMaxLifetime time.Duration
    88  
    89  	TLSConfig *tls.Config
    90  
    91  	// Only cluster clients.
    92  
    93  	MaxRedirects   int
    94  	ReadOnly       bool
    95  	RouteByLatency bool
    96  	RouteRandomly  bool
    97  
    98  	// MasterName is the sentinel master name.
    99  	// Only for failover clients.
   100  	MasterName string
   101  
   102  	// DisableIndentity - Disable set-lib on connect.
   103  	//
   104  	// default: false
   105  	//
   106  	// Deprecated: Use DisableIdentity instead.
   107  	DisableIndentity bool
   108  
   109  	// DisableIdentity is used to disable CLIENT SETINFO command on connect.
   110  	//
   111  	// default: false
   112  	DisableIdentity bool
   113  
   114  	IdentitySuffix string
   115  
   116  	// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
   117  	// When a node is marked as failing, it will be avoided for this duration.
   118  	// Only applies to cluster clients. Default is 15 seconds.
   119  	FailingTimeoutSeconds int
   120  
   121  	UnstableResp3 bool
   122  
   123  	// IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint).
   124  	IsClusterMode bool
   125  }
   126  
   127  // Cluster returns cluster options created from the universal options.
   128  func (o *UniversalOptions) Cluster() *ClusterOptions {
   129  	if len(o.Addrs) == 0 {
   130  		o.Addrs = []string{"127.0.0.1:6379"}
   131  	}
   132  
   133  	return &ClusterOptions{
   134  		Addrs:      o.Addrs,
   135  		ClientName: o.ClientName,
   136  		Dialer:     o.Dialer,
   137  		OnConnect:  o.OnConnect,
   138  
   139  		Protocol:                     o.Protocol,
   140  		Username:                     o.Username,
   141  		Password:                     o.Password,
   142  		CredentialsProvider:          o.CredentialsProvider,
   143  		CredentialsProviderContext:   o.CredentialsProviderContext,
   144  		StreamingCredentialsProvider: o.StreamingCredentialsProvider,
   145  
   146  		MaxRedirects:   o.MaxRedirects,
   147  		ReadOnly:       o.ReadOnly,
   148  		RouteByLatency: o.RouteByLatency,
   149  		RouteRandomly:  o.RouteRandomly,
   150  
   151  		MaxRetries:      o.MaxRetries,
   152  		MinRetryBackoff: o.MinRetryBackoff,
   153  		MaxRetryBackoff: o.MaxRetryBackoff,
   154  
   155  		DialTimeout:           o.DialTimeout,
   156  		ReadTimeout:           o.ReadTimeout,
   157  		WriteTimeout:          o.WriteTimeout,
   158  		ContextTimeoutEnabled: o.ContextTimeoutEnabled,
   159  
   160  		ReadBufferSize:  o.ReadBufferSize,
   161  		WriteBufferSize: o.WriteBufferSize,
   162  
   163  		PoolFIFO: o.PoolFIFO,
   164  
   165  		PoolSize:        o.PoolSize,
   166  		PoolTimeout:     o.PoolTimeout,
   167  		MinIdleConns:    o.MinIdleConns,
   168  		MaxIdleConns:    o.MaxIdleConns,
   169  		MaxActiveConns:  o.MaxActiveConns,
   170  		ConnMaxIdleTime: o.ConnMaxIdleTime,
   171  		ConnMaxLifetime: o.ConnMaxLifetime,
   172  
   173  		TLSConfig: o.TLSConfig,
   174  
   175  		DisableIdentity:       o.DisableIdentity,
   176  		DisableIndentity:      o.DisableIndentity,
   177  		IdentitySuffix:        o.IdentitySuffix,
   178  		FailingTimeoutSeconds: o.FailingTimeoutSeconds,
   179  		UnstableResp3:         o.UnstableResp3,
   180  	}
   181  }
   182  
   183  // Failover returns failover options created from the universal options.
   184  func (o *UniversalOptions) Failover() *FailoverOptions {
   185  	if len(o.Addrs) == 0 {
   186  		o.Addrs = []string{"127.0.0.1:26379"}
   187  	}
   188  
   189  	return &FailoverOptions{
   190  		SentinelAddrs: o.Addrs,
   191  		MasterName:    o.MasterName,
   192  		ClientName:    o.ClientName,
   193  
   194  		Dialer:    o.Dialer,
   195  		OnConnect: o.OnConnect,
   196  
   197  		DB:                           o.DB,
   198  		Protocol:                     o.Protocol,
   199  		Username:                     o.Username,
   200  		Password:                     o.Password,
   201  		CredentialsProvider:          o.CredentialsProvider,
   202  		CredentialsProviderContext:   o.CredentialsProviderContext,
   203  		StreamingCredentialsProvider: o.StreamingCredentialsProvider,
   204  
   205  		SentinelUsername: o.SentinelUsername,
   206  		SentinelPassword: o.SentinelPassword,
   207  
   208  		RouteByLatency: o.RouteByLatency,
   209  		RouteRandomly:  o.RouteRandomly,
   210  
   211  		MaxRetries:      o.MaxRetries,
   212  		MinRetryBackoff: o.MinRetryBackoff,
   213  		MaxRetryBackoff: o.MaxRetryBackoff,
   214  
   215  		DialTimeout:           o.DialTimeout,
   216  		ReadTimeout:           o.ReadTimeout,
   217  		WriteTimeout:          o.WriteTimeout,
   218  		ContextTimeoutEnabled: o.ContextTimeoutEnabled,
   219  
   220  		ReadBufferSize:  o.ReadBufferSize,
   221  		WriteBufferSize: o.WriteBufferSize,
   222  
   223  		PoolFIFO:        o.PoolFIFO,
   224  		PoolSize:        o.PoolSize,
   225  		PoolTimeout:     o.PoolTimeout,
   226  		MinIdleConns:    o.MinIdleConns,
   227  		MaxIdleConns:    o.MaxIdleConns,
   228  		MaxActiveConns:  o.MaxActiveConns,
   229  		ConnMaxIdleTime: o.ConnMaxIdleTime,
   230  		ConnMaxLifetime: o.ConnMaxLifetime,
   231  
   232  		TLSConfig: o.TLSConfig,
   233  
   234  		ReplicaOnly: o.ReadOnly,
   235  
   236  		DisableIdentity:  o.DisableIdentity,
   237  		DisableIndentity: o.DisableIndentity,
   238  		IdentitySuffix:   o.IdentitySuffix,
   239  		UnstableResp3:    o.UnstableResp3,
   240  	}
   241  }
   242  
   243  // Simple returns basic options created from the universal options.
   244  func (o *UniversalOptions) Simple() *Options {
   245  	addr := "127.0.0.1:6379"
   246  	if len(o.Addrs) > 0 {
   247  		addr = o.Addrs[0]
   248  	}
   249  
   250  	return &Options{
   251  		Addr:       addr,
   252  		ClientName: o.ClientName,
   253  		Dialer:     o.Dialer,
   254  		OnConnect:  o.OnConnect,
   255  
   256  		DB:                           o.DB,
   257  		Protocol:                     o.Protocol,
   258  		Username:                     o.Username,
   259  		Password:                     o.Password,
   260  		CredentialsProvider:          o.CredentialsProvider,
   261  		CredentialsProviderContext:   o.CredentialsProviderContext,
   262  		StreamingCredentialsProvider: o.StreamingCredentialsProvider,
   263  
   264  		MaxRetries:      o.MaxRetries,
   265  		MinRetryBackoff: o.MinRetryBackoff,
   266  		MaxRetryBackoff: o.MaxRetryBackoff,
   267  
   268  		DialTimeout:           o.DialTimeout,
   269  		ReadTimeout:           o.ReadTimeout,
   270  		WriteTimeout:          o.WriteTimeout,
   271  		ContextTimeoutEnabled: o.ContextTimeoutEnabled,
   272  
   273  		ReadBufferSize:  o.ReadBufferSize,
   274  		WriteBufferSize: o.WriteBufferSize,
   275  
   276  		PoolFIFO:        o.PoolFIFO,
   277  		PoolSize:        o.PoolSize,
   278  		PoolTimeout:     o.PoolTimeout,
   279  		MinIdleConns:    o.MinIdleConns,
   280  		MaxIdleConns:    o.MaxIdleConns,
   281  		MaxActiveConns:  o.MaxActiveConns,
   282  		ConnMaxIdleTime: o.ConnMaxIdleTime,
   283  		ConnMaxLifetime: o.ConnMaxLifetime,
   284  
   285  		TLSConfig: o.TLSConfig,
   286  
   287  		DisableIdentity:  o.DisableIdentity,
   288  		DisableIndentity: o.DisableIndentity,
   289  		IdentitySuffix:   o.IdentitySuffix,
   290  		UnstableResp3:    o.UnstableResp3,
   291  	}
   292  }
   293  
   294  // --------------------------------------------------------------------
   295  
   296  // UniversalClient is an abstract client which - based on the provided options -
   297  // represents either a ClusterClient, a FailoverClient, or a single-node Client.
   298  // This can be useful for testing cluster-specific applications locally or having different
   299  // clients in different environments.
   300  type UniversalClient interface {
   301  	Cmdable
   302  	AddHook(Hook)
   303  	Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error
   304  	Do(ctx context.Context, args ...interface{}) *Cmd
   305  	Process(ctx context.Context, cmd Cmder) error
   306  	Subscribe(ctx context.Context, channels ...string) *PubSub
   307  	PSubscribe(ctx context.Context, channels ...string) *PubSub
   308  	SSubscribe(ctx context.Context, channels ...string) *PubSub
   309  	Close() error
   310  	PoolStats() *PoolStats
   311  }
   312  
   313  var (
   314  	_ UniversalClient = (*Client)(nil)
   315  	_ UniversalClient = (*ClusterClient)(nil)
   316  	_ UniversalClient = (*Ring)(nil)
   317  )
   318  
   319  // NewUniversalClient returns a new multi client. The type of the returned client depends
   320  // on the following conditions:
   321  //
   322  //  1. If the MasterName option is specified with RouteByLatency, RouteRandomly or IsClusterMode,
   323  //     a FailoverClusterClient is returned.
   324  //  2. If the MasterName option is specified without RouteByLatency, RouteRandomly or IsClusterMode,
   325  //     a sentinel-backed FailoverClient is returned.
   326  //  3. If the number of Addrs is two or more, or IsClusterMode option is specified,
   327  //     a ClusterClient is returned.
   328  //  4. Otherwise, a single-node Client is returned.
   329  func NewUniversalClient(opts *UniversalOptions) UniversalClient {
   330  	if opts == nil {
   331  		panic("redis: NewUniversalClient nil options")
   332  	}
   333  
   334  	switch {
   335  	case opts.MasterName != "" && (opts.RouteByLatency || opts.RouteRandomly || opts.IsClusterMode):
   336  		return NewFailoverClusterClient(opts.Failover())
   337  	case opts.MasterName != "":
   338  		return NewFailoverClient(opts.Failover())
   339  	case len(opts.Addrs) > 1 || opts.IsClusterMode:
   340  		return NewClusterClient(opts.Cluster())
   341  	default:
   342  		return NewClient(opts.Simple())
   343  	}
   344  }
   345  

View as plain text