1 package redis_test
2
3 import (
4 "bytes"
5 "fmt"
6 "net"
7 "strconv"
8 "sync/atomic"
9 "testing"
10 "time"
11
12 . "github.com/bsm/ginkgo/v2"
13 . "github.com/bsm/gomega"
14
15 "github.com/redis/go-redis/v9"
16 )
17
18 var _ = Describe("races", func() {
19 var client *redis.Client
20 var C, N int
21
22 BeforeEach(func() {
23 client = redis.NewClient(redisOptions())
24 Expect(client.FlushDB(ctx).Err()).To(BeNil())
25
26 C, N = 10, 1000
27 if testing.Short() {
28 C = 4
29 N = 100
30 }
31 })
32
33 AfterEach(func() {
34 err := client.Close()
35 Expect(err).NotTo(HaveOccurred())
36 })
37
38 It("should echo", func() {
39 perform(C, func(id int) {
40 for i := 0; i < N; i++ {
41 msg := fmt.Sprintf("echo %d %d", id, i)
42 echo, err := client.Echo(ctx, msg).Result()
43 Expect(err).NotTo(HaveOccurred())
44 Expect(echo).To(Equal(msg))
45 }
46 })
47 })
48
49 It("should incr", func() {
50 key := "TestIncrFromGoroutines"
51
52 perform(C, func(id int) {
53 for i := 0; i < N; i++ {
54 err := client.Incr(ctx, key).Err()
55 Expect(err).NotTo(HaveOccurred())
56 }
57 })
58
59 val, err := client.Get(ctx, key).Int64()
60 Expect(err).NotTo(HaveOccurred())
61 Expect(val).To(Equal(int64(C * N)))
62 })
63
64 It("should handle many keys", func() {
65 perform(C, func(id int) {
66 for i := 0; i < N; i++ {
67 err := client.Set(
68 ctx,
69 fmt.Sprintf("keys.key-%d-%d", id, i),
70 fmt.Sprintf("hello-%d-%d", id, i),
71 0,
72 ).Err()
73 Expect(err).NotTo(HaveOccurred())
74 }
75 })
76
77 keys := client.Keys(ctx, "keys.*")
78 Expect(keys.Err()).NotTo(HaveOccurred())
79 Expect(len(keys.Val())).To(Equal(C * N))
80 })
81
82 It("should handle many keys 2", func() {
83 perform(C, func(id int) {
84 keys := []string{"non-existent-key"}
85 for i := 0; i < N; i++ {
86 key := fmt.Sprintf("keys.key-%d", i)
87 keys = append(keys, key)
88
89 err := client.Set(ctx, key, fmt.Sprintf("hello-%d", i), 0).Err()
90 Expect(err).NotTo(HaveOccurred())
91 }
92 keys = append(keys, "non-existent-key")
93
94 vals, err := client.MGet(ctx, keys...).Result()
95 Expect(err).NotTo(HaveOccurred())
96 Expect(len(vals)).To(Equal(N + 2))
97
98 for i := 0; i < N; i++ {
99 Expect(vals[i+1]).To(Equal(fmt.Sprintf("hello-%d", i)))
100 }
101
102 Expect(vals[0]).To(BeNil())
103 Expect(vals[N+1]).To(BeNil())
104 })
105 })
106
107 It("should handle big vals in Get", func() {
108 C, N := 4, 100
109
110 bigVal := bigVal()
111
112 err := client.Set(ctx, "key", bigVal, 0).Err()
113 Expect(err).NotTo(HaveOccurred())
114
115
116 Expect(client.Close()).To(BeNil())
117 client = redis.NewClient(redisOptions())
118
119 perform(C, func(id int) {
120 for i := 0; i < N; i++ {
121 got, err := client.Get(ctx, "key").Bytes()
122 Expect(err).NotTo(HaveOccurred())
123 Expect(got).To(Equal(bigVal))
124 }
125 })
126 })
127
128 It("should handle big vals in Set", func() {
129 C, N := 4, 100
130
131 bigVal := bigVal()
132 perform(C, func(id int) {
133 for i := 0; i < N; i++ {
134 err := client.Set(ctx, "key", bigVal, 0).Err()
135 Expect(err).NotTo(HaveOccurred())
136 }
137 })
138 })
139
140 It("should select db", Label("NonRedisEnterprise"), func() {
141 err := client.Set(ctx, "db", 0, 0).Err()
142 Expect(err).NotTo(HaveOccurred())
143
144 perform(C, func(id int) {
145 opt := redisOptions()
146 opt.DB = id
147 client := redis.NewClient(opt)
148 for i := 0; i < N; i++ {
149 err := client.Set(ctx, "db", id, 0).Err()
150 Expect(err).NotTo(HaveOccurred())
151
152 n, err := client.Get(ctx, "db").Int64()
153 Expect(err).NotTo(HaveOccurred())
154 Expect(n).To(Equal(int64(id)))
155 }
156 err := client.Close()
157 Expect(err).NotTo(HaveOccurred())
158 })
159
160 n, err := client.Get(ctx, "db").Int64()
161 Expect(err).NotTo(HaveOccurred())
162 Expect(n).To(Equal(int64(0)))
163 })
164
165 It("should select DB with read timeout", func() {
166 perform(C, func(id int) {
167 opt := redisOptions()
168 opt.DB = id
169 opt.ReadTimeout = time.Nanosecond
170 client := redis.NewClient(opt)
171
172 perform(C, func(id int) {
173 err := client.Ping(ctx).Err()
174 Expect(err).To(HaveOccurred())
175 Expect(err.(net.Error).Timeout()).To(BeTrue())
176 })
177
178 err := client.Close()
179 Expect(err).NotTo(HaveOccurred())
180 })
181 })
182
183 It("should Watch/Unwatch", func() {
184 err := client.Set(ctx, "key", "0", 0).Err()
185 Expect(err).NotTo(HaveOccurred())
186
187 perform(C, func(id int) {
188 for i := 0; i < N; i++ {
189 err := client.Watch(ctx, func(tx *redis.Tx) error {
190 val, err := tx.Get(ctx, "key").Result()
191 Expect(err).NotTo(HaveOccurred())
192 Expect(val).NotTo(Equal(redis.Nil))
193
194 num, err := strconv.ParseInt(val, 10, 64)
195 Expect(err).NotTo(HaveOccurred())
196
197 cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
198 pipe.Set(ctx, "key", strconv.FormatInt(num+1, 10), 0)
199 return nil
200 })
201 Expect(cmds).To(HaveLen(1))
202 return err
203 }, "key")
204 if err == redis.TxFailedErr {
205 i--
206 continue
207 }
208 Expect(err).NotTo(HaveOccurred())
209 }
210 })
211
212 val, err := client.Get(ctx, "key").Int64()
213 Expect(err).NotTo(HaveOccurred())
214 Expect(val).To(Equal(int64(C * N)))
215 })
216
217 It("should BLPop", func() {
218 C := 5
219 N := 5
220 var received uint32
221
222 wg := performAsync(C, func(id int) {
223 for {
224 v, err := client.BLPop(ctx, time.Second, "list").Result()
225 if err != nil {
226 if err == redis.Nil {
227 break
228 }
229 Expect(err).NotTo(HaveOccurred())
230 }
231 Expect(v).To(Equal([]string{"list", "hello"}))
232 atomic.AddUint32(&received, 1)
233 }
234 })
235
236 perform(C, func(id int) {
237 for i := 0; i < N; i++ {
238 err := client.LPush(ctx, "list", "hello").Err()
239 Expect(err).NotTo(HaveOccurred())
240 }
241 })
242
243 wg.Wait()
244 Expect(atomic.LoadUint32(&received)).To(Equal(uint32(C * N)))
245 })
246 })
247
248 var _ = Describe("cluster races", Label("NonRedisEnterprise"), func() {
249 var client *redis.ClusterClient
250 var C, N int
251
252 BeforeEach(func() {
253 opt := redisClusterOptions()
254 client = cluster.newClusterClient(ctx, opt)
255
256 C, N = 10, 1000
257 if testing.Short() {
258 C = 4
259 N = 100
260 }
261 })
262
263 AfterEach(func() {
264 err := client.Close()
265 Expect(err).NotTo(HaveOccurred())
266 })
267
268 It("should echo", func() {
269 perform(C, func(id int) {
270 for i := 0; i < N; i++ {
271 msg := fmt.Sprintf("echo %d %d", id, i)
272 echo, err := client.Echo(ctx, msg).Result()
273 Expect(err).NotTo(HaveOccurred())
274 Expect(echo).To(Equal(msg))
275 }
276 })
277 })
278
279 It("should get", func() {
280 perform(C, func(id int) {
281 for i := 0; i < N; i++ {
282 key := fmt.Sprintf("key_%d_%d", id, i)
283 _, err := client.Get(ctx, key).Result()
284 Expect(err).To(Equal(redis.Nil))
285 }
286 })
287 })
288
289 It("should incr", func() {
290 key := "TestIncrFromGoroutines"
291
292 perform(C, func(id int) {
293 for i := 0; i < N; i++ {
294 err := client.Incr(ctx, key).Err()
295 Expect(err).NotTo(HaveOccurred())
296 }
297 })
298
299 val, err := client.Get(ctx, key).Int64()
300 Expect(err).NotTo(HaveOccurred())
301 Expect(val).To(Equal(int64(C * N)))
302 })
303
304 It("write cmd data-race", func() {
305 pubsub := client.Subscribe(ctx)
306 defer pubsub.Close()
307
308 pubsub.Channel(redis.WithChannelHealthCheckInterval(time.Millisecond))
309 for i := 0; i < 100; i++ {
310 key := fmt.Sprintf("channel_%d", i)
311 pubsub.Subscribe(ctx, key)
312 pubsub.Unsubscribe(ctx, key)
313 }
314 })
315 })
316
317 func bigVal() []byte {
318 return bytes.Repeat([]byte{'*'}, 1<<17)
319 }
320
View as plain text