...

Source file src/github.com/redis/go-redis/v9/main_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"fmt"
     5  	"net"
     6  	"os"
     7  	"strconv"
     8  	"strings"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	. "github.com/bsm/ginkgo/v2"
    14  	. "github.com/bsm/gomega"
    15  	"github.com/redis/go-redis/v9"
    16  )
    17  
    18  const (
    19  	redisSecondaryPort = "6381"
    20  )
    21  
    22  const (
    23  	ringShard1Port = "6390"
    24  	ringShard2Port = "6391"
    25  	ringShard3Port = "6392"
    26  )
    27  
    28  const (
    29  	sentinelName       = "go-redis-test"
    30  	sentinelMasterPort = "9121"
    31  	sentinelSlave1Port = "9122"
    32  	sentinelSlave2Port = "9123"
    33  	sentinelPort1      = "26379"
    34  	sentinelPort2      = "26380"
    35  	sentinelPort3      = "26381"
    36  )
    37  
    38  var (
    39  	redisPort = "6380"
    40  	redisAddr = ":" + redisPort
    41  )
    42  
    43  var (
    44  	redisStackPort = "6379"
    45  	redisStackAddr = ":" + redisStackPort
    46  )
    47  
    48  var (
    49  	sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
    50  
    51  	ringShard1, ringShard2, ringShard3             *redis.Client
    52  	sentinelMaster, sentinelSlave1, sentinelSlave2 *redis.Client
    53  	sentinel1, sentinel2, sentinel3                *redis.Client
    54  )
    55  
    56  var cluster = &clusterScenario{
    57  	ports:   []string{"16600", "16601", "16602", "16603", "16604", "16605"},
    58  	nodeIDs: make([]string, 6),
    59  	clients: make(map[string]*redis.Client, 6),
    60  }
    61  
    62  // Redis Software Cluster
    63  var RECluster = false
    64  
    65  // Redis Community Edition Docker
    66  var RCEDocker = false
    67  
    68  // Notes version of redis we are executing tests against.
    69  // This can be used before we change the bsm fork of ginkgo for one,
    70  // which have support for label sets, so we can filter tests per redis version.
    71  var RedisVersion float64 = 8.2
    72  
    73  func SkipBeforeRedisVersion(version float64, msg string) {
    74  	if RedisVersion < version {
    75  		Skip(fmt.Sprintf("(redis version < %f) %s", version, msg))
    76  	}
    77  }
    78  
    79  func SkipAfterRedisVersion(version float64, msg string) {
    80  	if RedisVersion > version {
    81  		Skip(fmt.Sprintf("(redis version > %f) %s", version, msg))
    82  	}
    83  }
    84  
    85  var _ = BeforeSuite(func() {
    86  	addr := os.Getenv("REDIS_PORT")
    87  	if addr != "" {
    88  		redisPort = addr
    89  		redisAddr = ":" + redisPort
    90  	}
    91  	var err error
    92  	RECluster, _ = strconv.ParseBool(os.Getenv("RE_CLUSTER"))
    93  	RCEDocker, _ = strconv.ParseBool(os.Getenv("RCE_DOCKER"))
    94  
    95  	RedisVersion, _ = strconv.ParseFloat(strings.Trim(os.Getenv("REDIS_VERSION"), "\""), 64)
    96  
    97  	if RedisVersion == 0 {
    98  		RedisVersion = 8.2
    99  	}
   100  
   101  	fmt.Printf("RECluster: %v\n", RECluster)
   102  	fmt.Printf("RCEDocker: %v\n", RCEDocker)
   103  	fmt.Printf("REDIS_VERSION: %.1f\n", RedisVersion)
   104  	fmt.Printf("CLIENT_LIBS_TEST_IMAGE: %v\n", os.Getenv("CLIENT_LIBS_TEST_IMAGE"))
   105  
   106  	if RedisVersion < 7.0 || RedisVersion > 9 {
   107  		panic("incorrect or not supported redis version")
   108  	}
   109  
   110  	redisPort = redisStackPort
   111  	redisAddr = redisStackAddr
   112  	if !RECluster {
   113  		ringShard1, err = connectTo(ringShard1Port)
   114  		Expect(err).NotTo(HaveOccurred())
   115  
   116  		ringShard2, err = connectTo(ringShard2Port)
   117  		Expect(err).NotTo(HaveOccurred())
   118  
   119  		ringShard3, err = connectTo(ringShard3Port)
   120  		Expect(err).NotTo(HaveOccurred())
   121  
   122  		sentinelMaster, err = connectTo(sentinelMasterPort)
   123  		Expect(err).NotTo(HaveOccurred())
   124  
   125  		sentinel1, err = startSentinel(sentinelPort1, sentinelName, sentinelMasterPort)
   126  		Expect(err).NotTo(HaveOccurred())
   127  
   128  		sentinel2, err = startSentinel(sentinelPort2, sentinelName, sentinelMasterPort)
   129  		Expect(err).NotTo(HaveOccurred())
   130  
   131  		sentinel3, err = startSentinel(sentinelPort3, sentinelName, sentinelMasterPort)
   132  		Expect(err).NotTo(HaveOccurred())
   133  
   134  		sentinelSlave1, err = connectTo(sentinelSlave1Port)
   135  		Expect(err).NotTo(HaveOccurred())
   136  
   137  		err = sentinelSlave1.SlaveOf(ctx, "127.0.0.1", sentinelMasterPort).Err()
   138  		Expect(err).NotTo(HaveOccurred())
   139  
   140  		sentinelSlave2, err = connectTo(sentinelSlave2Port)
   141  		Expect(err).NotTo(HaveOccurred())
   142  
   143  		err = sentinelSlave2.SlaveOf(ctx, "127.0.0.1", sentinelMasterPort).Err()
   144  		Expect(err).NotTo(HaveOccurred())
   145  
   146  		// populate cluster node information
   147  		Expect(configureClusterTopology(ctx, cluster)).NotTo(HaveOccurred())
   148  	}
   149  })
   150  
   151  var _ = AfterSuite(func() {
   152  	if !RECluster {
   153  		Expect(cluster.Close()).NotTo(HaveOccurred())
   154  	}
   155  })
   156  
   157  func TestGinkgoSuite(t *testing.T) {
   158  	RegisterFailHandler(Fail)
   159  	RunSpecs(t, "go-redis")
   160  }
   161  
   162  //------------------------------------------------------------------------------
   163  
   164  func redisOptions() *redis.Options {
   165  	if RECluster {
   166  		return &redis.Options{
   167  			Addr: redisAddr,
   168  			DB:   0,
   169  
   170  			DialTimeout:           10 * time.Second,
   171  			ReadTimeout:           30 * time.Second,
   172  			WriteTimeout:          30 * time.Second,
   173  			ContextTimeoutEnabled: true,
   174  
   175  			MaxRetries: -1,
   176  
   177  			PoolSize:        10,
   178  			PoolTimeout:     30 * time.Second,
   179  			ConnMaxIdleTime: time.Minute,
   180  		}
   181  	}
   182  	return &redis.Options{
   183  		Addr: redisAddr,
   184  		DB:   0,
   185  
   186  		DialTimeout:           10 * time.Second,
   187  		ReadTimeout:           30 * time.Second,
   188  		WriteTimeout:          30 * time.Second,
   189  		ContextTimeoutEnabled: true,
   190  
   191  		MaxRetries: -1,
   192  
   193  		PoolSize:        10,
   194  		PoolTimeout:     30 * time.Second,
   195  		ConnMaxIdleTime: time.Minute,
   196  	}
   197  }
   198  
   199  func redisClusterOptions() *redis.ClusterOptions {
   200  	return &redis.ClusterOptions{
   201  		DialTimeout:  10 * time.Second,
   202  		ReadTimeout:  30 * time.Second,
   203  		WriteTimeout: 30 * time.Second,
   204  
   205  		MaxRedirects: 8,
   206  
   207  		PoolSize:        10,
   208  		PoolTimeout:     30 * time.Second,
   209  		ConnMaxIdleTime: time.Minute,
   210  	}
   211  }
   212  
   213  func redisRingOptions() *redis.RingOptions {
   214  	return &redis.RingOptions{
   215  		Addrs: map[string]string{
   216  			"ringShardOne": ":" + ringShard1Port,
   217  			"ringShardTwo": ":" + ringShard2Port,
   218  		},
   219  
   220  		DialTimeout:  10 * time.Second,
   221  		ReadTimeout:  30 * time.Second,
   222  		WriteTimeout: 30 * time.Second,
   223  
   224  		MaxRetries: -1,
   225  
   226  		PoolSize:        10,
   227  		PoolTimeout:     30 * time.Second,
   228  		ConnMaxIdleTime: time.Minute,
   229  	}
   230  }
   231  
   232  func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
   233  	var wg sync.WaitGroup
   234  	for _, cb := range cbs {
   235  		wg.Add(n)
   236  		// start from 1, so we can skip db 0 where such test is executed with
   237  		// select db command
   238  		for i := 1; i <= n; i++ {
   239  			go func(cb func(int), i int) {
   240  				defer GinkgoRecover()
   241  				defer wg.Done()
   242  
   243  				cb(i)
   244  			}(cb, i)
   245  		}
   246  	}
   247  	return &wg
   248  }
   249  
   250  func perform(n int, cbs ...func(int)) {
   251  	wg := performAsync(n, cbs...)
   252  	wg.Wait()
   253  }
   254  
   255  func eventually(fn func() error, timeout time.Duration) error {
   256  	errCh := make(chan error, 1)
   257  	done := make(chan struct{})
   258  	exit := make(chan struct{})
   259  
   260  	go func() {
   261  		for {
   262  			err := fn()
   263  			if err == nil {
   264  				close(done)
   265  				return
   266  			}
   267  
   268  			select {
   269  			case errCh <- err:
   270  			default:
   271  			}
   272  
   273  			select {
   274  			case <-exit:
   275  				return
   276  			case <-time.After(timeout / 100):
   277  			}
   278  		}
   279  	}()
   280  
   281  	select {
   282  	case <-done:
   283  		return nil
   284  	case <-time.After(timeout):
   285  		close(exit)
   286  		select {
   287  		case err := <-errCh:
   288  			return err
   289  		default:
   290  			return fmt.Errorf("timeout after %s without an error", timeout)
   291  		}
   292  	}
   293  }
   294  
   295  func connectTo(port string) (*redis.Client, error) {
   296  	client := redis.NewClient(&redis.Options{
   297  		Addr:       ":" + port,
   298  		MaxRetries: -1,
   299  	})
   300  
   301  	err := eventually(func() error {
   302  		return client.Ping(ctx).Err()
   303  	}, 30*time.Second)
   304  	if err != nil {
   305  		return nil, err
   306  	}
   307  
   308  	return client, nil
   309  }
   310  
   311  func startSentinel(port, masterName, masterPort string) (*redis.Client, error) {
   312  	client, err := connectTo(port)
   313  	if err != nil {
   314  		return nil, err
   315  	}
   316  
   317  	for _, cmd := range []*redis.StatusCmd{
   318  		redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
   319  	} {
   320  		client.Process(ctx, cmd)
   321  		if err := cmd.Err(); err != nil && !strings.Contains(err.Error(), "ERR Duplicate master name.") {
   322  			return nil, fmt.Errorf("%s failed: %w", cmd, err)
   323  		}
   324  	}
   325  
   326  	return client, nil
   327  }
   328  
   329  //------------------------------------------------------------------------------
   330  
   331  type badConnError string
   332  
   333  func (e badConnError) Error() string   { return string(e) }
   334  func (e badConnError) Timeout() bool   { return true }
   335  func (e badConnError) Temporary() bool { return false }
   336  
   337  type badConn struct {
   338  	net.TCPConn
   339  
   340  	readDelay, writeDelay time.Duration
   341  	readErr, writeErr     error
   342  }
   343  
   344  var _ net.Conn = &badConn{}
   345  
   346  func (cn *badConn) SetReadDeadline(t time.Time) error {
   347  	return nil
   348  }
   349  
   350  func (cn *badConn) SetWriteDeadline(t time.Time) error {
   351  	return nil
   352  }
   353  
   354  func (cn *badConn) Read([]byte) (int, error) {
   355  	if cn.readDelay != 0 {
   356  		time.Sleep(cn.readDelay)
   357  	}
   358  	if cn.readErr != nil {
   359  		return 0, cn.readErr
   360  	}
   361  	return 0, badConnError("bad connection")
   362  }
   363  
   364  func (cn *badConn) Write([]byte) (int, error) {
   365  	if cn.writeDelay != 0 {
   366  		time.Sleep(cn.writeDelay)
   367  	}
   368  	if cn.writeErr != nil {
   369  		return 0, cn.writeErr
   370  	}
   371  	return 0, badConnError("bad connection")
   372  }
   373  
   374  //------------------------------------------------------------------------------
   375  
   376  type hook struct {
   377  	dialHook            func(hook redis.DialHook) redis.DialHook
   378  	processHook         func(hook redis.ProcessHook) redis.ProcessHook
   379  	processPipelineHook func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook
   380  }
   381  
   382  func (h *hook) DialHook(hook redis.DialHook) redis.DialHook {
   383  	if h.dialHook != nil {
   384  		return h.dialHook(hook)
   385  	}
   386  	return hook
   387  }
   388  
   389  func (h *hook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
   390  	if h.processHook != nil {
   391  		return h.processHook(hook)
   392  	}
   393  	return hook
   394  }
   395  
   396  func (h *hook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
   397  	if h.processPipelineHook != nil {
   398  		return h.processPipelineHook(hook)
   399  	}
   400  	return hook
   401  }
   402  

View as plain text