...

Source file src/github.com/redis/go-redis/v9/race_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"net"
     7  	"strconv"
     8  	"sync/atomic"
     9  	"testing"
    10  	"time"
    11  
    12  	. "github.com/bsm/ginkgo/v2"
    13  	. "github.com/bsm/gomega"
    14  
    15  	"github.com/redis/go-redis/v9"
    16  )
    17  
    18  var _ = Describe("races", func() {
    19  	var client *redis.Client
    20  	var C, N int
    21  
    22  	BeforeEach(func() {
    23  		client = redis.NewClient(redisOptions())
    24  		Expect(client.FlushDB(ctx).Err()).To(BeNil())
    25  
    26  		C, N = 10, 1000
    27  		if testing.Short() {
    28  			C = 4
    29  			N = 100
    30  		}
    31  	})
    32  
    33  	AfterEach(func() {
    34  		err := client.Close()
    35  		Expect(err).NotTo(HaveOccurred())
    36  	})
    37  
    38  	It("should echo", func() {
    39  		perform(C, func(id int) {
    40  			for i := 0; i < N; i++ {
    41  				msg := fmt.Sprintf("echo %d %d", id, i)
    42  				echo, err := client.Echo(ctx, msg).Result()
    43  				Expect(err).NotTo(HaveOccurred())
    44  				Expect(echo).To(Equal(msg))
    45  			}
    46  		})
    47  	})
    48  
    49  	It("should incr", func() {
    50  		key := "TestIncrFromGoroutines"
    51  
    52  		perform(C, func(id int) {
    53  			for i := 0; i < N; i++ {
    54  				err := client.Incr(ctx, key).Err()
    55  				Expect(err).NotTo(HaveOccurred())
    56  			}
    57  		})
    58  
    59  		val, err := client.Get(ctx, key).Int64()
    60  		Expect(err).NotTo(HaveOccurred())
    61  		Expect(val).To(Equal(int64(C * N)))
    62  	})
    63  
    64  	It("should handle many keys", func() {
    65  		perform(C, func(id int) {
    66  			for i := 0; i < N; i++ {
    67  				err := client.Set(
    68  					ctx,
    69  					fmt.Sprintf("keys.key-%d-%d", id, i),
    70  					fmt.Sprintf("hello-%d-%d", id, i),
    71  					0,
    72  				).Err()
    73  				Expect(err).NotTo(HaveOccurred())
    74  			}
    75  		})
    76  
    77  		keys := client.Keys(ctx, "keys.*")
    78  		Expect(keys.Err()).NotTo(HaveOccurred())
    79  		Expect(len(keys.Val())).To(Equal(C * N))
    80  	})
    81  
    82  	It("should handle many keys 2", func() {
    83  		perform(C, func(id int) {
    84  			keys := []string{"non-existent-key"}
    85  			for i := 0; i < N; i++ {
    86  				key := fmt.Sprintf("keys.key-%d", i)
    87  				keys = append(keys, key)
    88  
    89  				err := client.Set(ctx, key, fmt.Sprintf("hello-%d", i), 0).Err()
    90  				Expect(err).NotTo(HaveOccurred())
    91  			}
    92  			keys = append(keys, "non-existent-key")
    93  
    94  			vals, err := client.MGet(ctx, keys...).Result()
    95  			Expect(err).NotTo(HaveOccurred())
    96  			Expect(len(vals)).To(Equal(N + 2))
    97  
    98  			for i := 0; i < N; i++ {
    99  				Expect(vals[i+1]).To(Equal(fmt.Sprintf("hello-%d", i)))
   100  			}
   101  
   102  			Expect(vals[0]).To(BeNil())
   103  			Expect(vals[N+1]).To(BeNil())
   104  		})
   105  	})
   106  
   107  	It("should handle big vals in Get", func() {
   108  		C, N := 4, 100
   109  
   110  		bigVal := bigVal()
   111  
   112  		err := client.Set(ctx, "key", bigVal, 0).Err()
   113  		Expect(err).NotTo(HaveOccurred())
   114  
   115  		// Reconnect to get new connection.
   116  		Expect(client.Close()).To(BeNil())
   117  		client = redis.NewClient(redisOptions())
   118  
   119  		perform(C, func(id int) {
   120  			for i := 0; i < N; i++ {
   121  				got, err := client.Get(ctx, "key").Bytes()
   122  				Expect(err).NotTo(HaveOccurred())
   123  				Expect(got).To(Equal(bigVal))
   124  			}
   125  		})
   126  	})
   127  
   128  	It("should handle big vals in Set", func() {
   129  		C, N := 4, 100
   130  
   131  		bigVal := bigVal()
   132  		perform(C, func(id int) {
   133  			for i := 0; i < N; i++ {
   134  				err := client.Set(ctx, "key", bigVal, 0).Err()
   135  				Expect(err).NotTo(HaveOccurred())
   136  			}
   137  		})
   138  	})
   139  
   140  	It("should select db", Label("NonRedisEnterprise"), func() {
   141  		err := client.Set(ctx, "db", 0, 0).Err()
   142  		Expect(err).NotTo(HaveOccurred())
   143  
   144  		perform(C, func(id int) {
   145  			opt := redisOptions()
   146  			opt.DB = id
   147  			client := redis.NewClient(opt)
   148  			for i := 0; i < N; i++ {
   149  				err := client.Set(ctx, "db", id, 0).Err()
   150  				Expect(err).NotTo(HaveOccurred())
   151  
   152  				n, err := client.Get(ctx, "db").Int64()
   153  				Expect(err).NotTo(HaveOccurred())
   154  				Expect(n).To(Equal(int64(id)))
   155  			}
   156  			err := client.Close()
   157  			Expect(err).NotTo(HaveOccurred())
   158  		})
   159  
   160  		n, err := client.Get(ctx, "db").Int64()
   161  		Expect(err).NotTo(HaveOccurred())
   162  		Expect(n).To(Equal(int64(0)))
   163  	})
   164  
   165  	It("should select DB with read timeout", func() {
   166  		perform(C, func(id int) {
   167  			opt := redisOptions()
   168  			opt.DB = id
   169  			opt.ReadTimeout = time.Nanosecond
   170  			client := redis.NewClient(opt)
   171  
   172  			perform(C, func(id int) {
   173  				err := client.Ping(ctx).Err()
   174  				Expect(err).To(HaveOccurred())
   175  				Expect(err.(net.Error).Timeout()).To(BeTrue())
   176  			})
   177  
   178  			err := client.Close()
   179  			Expect(err).NotTo(HaveOccurred())
   180  		})
   181  	})
   182  
   183  	It("should Watch/Unwatch", func() {
   184  		err := client.Set(ctx, "key", "0", 0).Err()
   185  		Expect(err).NotTo(HaveOccurred())
   186  
   187  		perform(C, func(id int) {
   188  			for i := 0; i < N; i++ {
   189  				err := client.Watch(ctx, func(tx *redis.Tx) error {
   190  					val, err := tx.Get(ctx, "key").Result()
   191  					Expect(err).NotTo(HaveOccurred())
   192  					Expect(val).NotTo(Equal(redis.Nil))
   193  
   194  					num, err := strconv.ParseInt(val, 10, 64)
   195  					Expect(err).NotTo(HaveOccurred())
   196  
   197  					cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
   198  						pipe.Set(ctx, "key", strconv.FormatInt(num+1, 10), 0)
   199  						return nil
   200  					})
   201  					Expect(cmds).To(HaveLen(1))
   202  					return err
   203  				}, "key")
   204  				if err == redis.TxFailedErr {
   205  					i--
   206  					continue
   207  				}
   208  				Expect(err).NotTo(HaveOccurred())
   209  			}
   210  		})
   211  
   212  		val, err := client.Get(ctx, "key").Int64()
   213  		Expect(err).NotTo(HaveOccurred())
   214  		Expect(val).To(Equal(int64(C * N)))
   215  	})
   216  
   217  	It("should BLPop", func() {
   218  		C := 5
   219  		N := 5
   220  		var received uint32
   221  
   222  		wg := performAsync(C, func(id int) {
   223  			for {
   224  				v, err := client.BLPop(ctx, time.Second, "list").Result()
   225  				if err != nil {
   226  					if err == redis.Nil {
   227  						break
   228  					}
   229  					Expect(err).NotTo(HaveOccurred())
   230  				}
   231  				Expect(v).To(Equal([]string{"list", "hello"}))
   232  				atomic.AddUint32(&received, 1)
   233  			}
   234  		})
   235  
   236  		perform(C, func(id int) {
   237  			for i := 0; i < N; i++ {
   238  				err := client.LPush(ctx, "list", "hello").Err()
   239  				Expect(err).NotTo(HaveOccurred())
   240  			}
   241  		})
   242  
   243  		wg.Wait()
   244  		Expect(atomic.LoadUint32(&received)).To(Equal(uint32(C * N)))
   245  	})
   246  })
   247  
   248  var _ = Describe("cluster races", Label("NonRedisEnterprise"), func() {
   249  	var client *redis.ClusterClient
   250  	var C, N int
   251  
   252  	BeforeEach(func() {
   253  		opt := redisClusterOptions()
   254  		client = cluster.newClusterClient(ctx, opt)
   255  
   256  		C, N = 10, 1000
   257  		if testing.Short() {
   258  			C = 4
   259  			N = 100
   260  		}
   261  	})
   262  
   263  	AfterEach(func() {
   264  		err := client.Close()
   265  		Expect(err).NotTo(HaveOccurred())
   266  	})
   267  
   268  	It("should echo", func() {
   269  		perform(C, func(id int) {
   270  			for i := 0; i < N; i++ {
   271  				msg := fmt.Sprintf("echo %d %d", id, i)
   272  				echo, err := client.Echo(ctx, msg).Result()
   273  				Expect(err).NotTo(HaveOccurred())
   274  				Expect(echo).To(Equal(msg))
   275  			}
   276  		})
   277  	})
   278  
   279  	It("should get", func() {
   280  		perform(C, func(id int) {
   281  			for i := 0; i < N; i++ {
   282  				key := fmt.Sprintf("key_%d_%d", id, i)
   283  				_, err := client.Get(ctx, key).Result()
   284  				Expect(err).To(Equal(redis.Nil))
   285  			}
   286  		})
   287  	})
   288  
   289  	It("should incr", func() {
   290  		key := "TestIncrFromGoroutines"
   291  
   292  		perform(C, func(id int) {
   293  			for i := 0; i < N; i++ {
   294  				err := client.Incr(ctx, key).Err()
   295  				Expect(err).NotTo(HaveOccurred())
   296  			}
   297  		})
   298  
   299  		val, err := client.Get(ctx, key).Int64()
   300  		Expect(err).NotTo(HaveOccurred())
   301  		Expect(val).To(Equal(int64(C * N)))
   302  	})
   303  
   304  	It("write cmd data-race", func() {
   305  		pubsub := client.Subscribe(ctx)
   306  		defer pubsub.Close()
   307  
   308  		pubsub.Channel(redis.WithChannelHealthCheckInterval(time.Millisecond))
   309  		for i := 0; i < 100; i++ {
   310  			key := fmt.Sprintf("channel_%d", i)
   311  			pubsub.Subscribe(ctx, key)
   312  			pubsub.Unsubscribe(ctx, key)
   313  		}
   314  	})
   315  })
   316  
   317  func bigVal() []byte {
   318  	return bytes.Repeat([]byte{'*'}, 1<<17) // 128kb
   319  }
   320  

View as plain text