1 package redis_test
2
3 import (
4 "fmt"
5 "net"
6 "os"
7 "strconv"
8 "strings"
9 "sync"
10 "testing"
11 "time"
12
13 . "github.com/bsm/ginkgo/v2"
14 . "github.com/bsm/gomega"
15 "github.com/redis/go-redis/v9"
16 )
17
18 const (
19 redisSecondaryPort = "6381"
20 )
21
22 const (
23 ringShard1Port = "6390"
24 ringShard2Port = "6391"
25 ringShard3Port = "6392"
26 )
27
28 const (
29 sentinelName = "go-redis-test"
30 sentinelMasterPort = "9121"
31 sentinelSlave1Port = "9122"
32 sentinelSlave2Port = "9123"
33 sentinelPort1 = "26379"
34 sentinelPort2 = "26380"
35 sentinelPort3 = "26381"
36 )
37
38 var (
39 redisPort = "6380"
40 redisAddr = ":" + redisPort
41 )
42
43 var (
44 redisStackPort = "6379"
45 redisStackAddr = ":" + redisStackPort
46 )
47
48 var (
49 sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
50
51 ringShard1, ringShard2, ringShard3 *redis.Client
52 sentinelMaster, sentinelSlave1, sentinelSlave2 *redis.Client
53 sentinel1, sentinel2, sentinel3 *redis.Client
54 )
55
56 var cluster = &clusterScenario{
57 ports: []string{"16600", "16601", "16602", "16603", "16604", "16605"},
58 nodeIDs: make([]string, 6),
59 clients: make(map[string]*redis.Client, 6),
60 }
61
62
63 var RECluster = false
64
65
66 var RCEDocker = false
67
68
69
70
71 var RedisVersion float64 = 8.2
72
73 func SkipBeforeRedisVersion(version float64, msg string) {
74 if RedisVersion < version {
75 Skip(fmt.Sprintf("(redis version < %f) %s", version, msg))
76 }
77 }
78
79 func SkipAfterRedisVersion(version float64, msg string) {
80 if RedisVersion > version {
81 Skip(fmt.Sprintf("(redis version > %f) %s", version, msg))
82 }
83 }
84
85 var _ = BeforeSuite(func() {
86 addr := os.Getenv("REDIS_PORT")
87 if addr != "" {
88 redisPort = addr
89 redisAddr = ":" + redisPort
90 }
91 var err error
92 RECluster, _ = strconv.ParseBool(os.Getenv("RE_CLUSTER"))
93 RCEDocker, _ = strconv.ParseBool(os.Getenv("RCE_DOCKER"))
94
95 RedisVersion, _ = strconv.ParseFloat(strings.Trim(os.Getenv("REDIS_VERSION"), "\""), 64)
96
97 if RedisVersion == 0 {
98 RedisVersion = 8.2
99 }
100
101 fmt.Printf("RECluster: %v\n", RECluster)
102 fmt.Printf("RCEDocker: %v\n", RCEDocker)
103 fmt.Printf("REDIS_VERSION: %.1f\n", RedisVersion)
104 fmt.Printf("CLIENT_LIBS_TEST_IMAGE: %v\n", os.Getenv("CLIENT_LIBS_TEST_IMAGE"))
105
106 if RedisVersion < 7.0 || RedisVersion > 9 {
107 panic("incorrect or not supported redis version")
108 }
109
110 redisPort = redisStackPort
111 redisAddr = redisStackAddr
112 if !RECluster {
113 ringShard1, err = connectTo(ringShard1Port)
114 Expect(err).NotTo(HaveOccurred())
115
116 ringShard2, err = connectTo(ringShard2Port)
117 Expect(err).NotTo(HaveOccurred())
118
119 ringShard3, err = connectTo(ringShard3Port)
120 Expect(err).NotTo(HaveOccurred())
121
122 sentinelMaster, err = connectTo(sentinelMasterPort)
123 Expect(err).NotTo(HaveOccurred())
124
125 sentinel1, err = startSentinel(sentinelPort1, sentinelName, sentinelMasterPort)
126 Expect(err).NotTo(HaveOccurred())
127
128 sentinel2, err = startSentinel(sentinelPort2, sentinelName, sentinelMasterPort)
129 Expect(err).NotTo(HaveOccurred())
130
131 sentinel3, err = startSentinel(sentinelPort3, sentinelName, sentinelMasterPort)
132 Expect(err).NotTo(HaveOccurred())
133
134 sentinelSlave1, err = connectTo(sentinelSlave1Port)
135 Expect(err).NotTo(HaveOccurred())
136
137 err = sentinelSlave1.SlaveOf(ctx, "127.0.0.1", sentinelMasterPort).Err()
138 Expect(err).NotTo(HaveOccurred())
139
140 sentinelSlave2, err = connectTo(sentinelSlave2Port)
141 Expect(err).NotTo(HaveOccurred())
142
143 err = sentinelSlave2.SlaveOf(ctx, "127.0.0.1", sentinelMasterPort).Err()
144 Expect(err).NotTo(HaveOccurred())
145
146
147 Expect(configureClusterTopology(ctx, cluster)).NotTo(HaveOccurred())
148 }
149 })
150
151 var _ = AfterSuite(func() {
152 if !RECluster {
153 Expect(cluster.Close()).NotTo(HaveOccurred())
154 }
155 })
156
157 func TestGinkgoSuite(t *testing.T) {
158 RegisterFailHandler(Fail)
159 RunSpecs(t, "go-redis")
160 }
161
162
163
164 func redisOptions() *redis.Options {
165 if RECluster {
166 return &redis.Options{
167 Addr: redisAddr,
168 DB: 0,
169
170 DialTimeout: 10 * time.Second,
171 ReadTimeout: 30 * time.Second,
172 WriteTimeout: 30 * time.Second,
173 ContextTimeoutEnabled: true,
174
175 MaxRetries: -1,
176
177 PoolSize: 10,
178 PoolTimeout: 30 * time.Second,
179 ConnMaxIdleTime: time.Minute,
180 }
181 }
182 return &redis.Options{
183 Addr: redisAddr,
184 DB: 0,
185
186 DialTimeout: 10 * time.Second,
187 ReadTimeout: 30 * time.Second,
188 WriteTimeout: 30 * time.Second,
189 ContextTimeoutEnabled: true,
190
191 MaxRetries: -1,
192
193 PoolSize: 10,
194 PoolTimeout: 30 * time.Second,
195 ConnMaxIdleTime: time.Minute,
196 }
197 }
198
199 func redisClusterOptions() *redis.ClusterOptions {
200 return &redis.ClusterOptions{
201 DialTimeout: 10 * time.Second,
202 ReadTimeout: 30 * time.Second,
203 WriteTimeout: 30 * time.Second,
204
205 MaxRedirects: 8,
206
207 PoolSize: 10,
208 PoolTimeout: 30 * time.Second,
209 ConnMaxIdleTime: time.Minute,
210 }
211 }
212
213 func redisRingOptions() *redis.RingOptions {
214 return &redis.RingOptions{
215 Addrs: map[string]string{
216 "ringShardOne": ":" + ringShard1Port,
217 "ringShardTwo": ":" + ringShard2Port,
218 },
219
220 DialTimeout: 10 * time.Second,
221 ReadTimeout: 30 * time.Second,
222 WriteTimeout: 30 * time.Second,
223
224 MaxRetries: -1,
225
226 PoolSize: 10,
227 PoolTimeout: 30 * time.Second,
228 ConnMaxIdleTime: time.Minute,
229 }
230 }
231
232 func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
233 var wg sync.WaitGroup
234 for _, cb := range cbs {
235 wg.Add(n)
236
237
238 for i := 1; i <= n; i++ {
239 go func(cb func(int), i int) {
240 defer GinkgoRecover()
241 defer wg.Done()
242
243 cb(i)
244 }(cb, i)
245 }
246 }
247 return &wg
248 }
249
250 func perform(n int, cbs ...func(int)) {
251 wg := performAsync(n, cbs...)
252 wg.Wait()
253 }
254
255 func eventually(fn func() error, timeout time.Duration) error {
256 errCh := make(chan error, 1)
257 done := make(chan struct{})
258 exit := make(chan struct{})
259
260 go func() {
261 for {
262 err := fn()
263 if err == nil {
264 close(done)
265 return
266 }
267
268 select {
269 case errCh <- err:
270 default:
271 }
272
273 select {
274 case <-exit:
275 return
276 case <-time.After(timeout / 100):
277 }
278 }
279 }()
280
281 select {
282 case <-done:
283 return nil
284 case <-time.After(timeout):
285 close(exit)
286 select {
287 case err := <-errCh:
288 return err
289 default:
290 return fmt.Errorf("timeout after %s without an error", timeout)
291 }
292 }
293 }
294
295 func connectTo(port string) (*redis.Client, error) {
296 client := redis.NewClient(&redis.Options{
297 Addr: ":" + port,
298 MaxRetries: -1,
299 })
300
301 err := eventually(func() error {
302 return client.Ping(ctx).Err()
303 }, 30*time.Second)
304 if err != nil {
305 return nil, err
306 }
307
308 return client, nil
309 }
310
311 func startSentinel(port, masterName, masterPort string) (*redis.Client, error) {
312 client, err := connectTo(port)
313 if err != nil {
314 return nil, err
315 }
316
317 for _, cmd := range []*redis.StatusCmd{
318 redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
319 } {
320 client.Process(ctx, cmd)
321 if err := cmd.Err(); err != nil && !strings.Contains(err.Error(), "ERR Duplicate master name.") {
322 return nil, fmt.Errorf("%s failed: %w", cmd, err)
323 }
324 }
325
326 return client, nil
327 }
328
329
330
331 type badConnError string
332
333 func (e badConnError) Error() string { return string(e) }
334 func (e badConnError) Timeout() bool { return true }
335 func (e badConnError) Temporary() bool { return false }
336
337 type badConn struct {
338 net.TCPConn
339
340 readDelay, writeDelay time.Duration
341 readErr, writeErr error
342 }
343
344 var _ net.Conn = &badConn{}
345
346 func (cn *badConn) SetReadDeadline(t time.Time) error {
347 return nil
348 }
349
350 func (cn *badConn) SetWriteDeadline(t time.Time) error {
351 return nil
352 }
353
354 func (cn *badConn) Read([]byte) (int, error) {
355 if cn.readDelay != 0 {
356 time.Sleep(cn.readDelay)
357 }
358 if cn.readErr != nil {
359 return 0, cn.readErr
360 }
361 return 0, badConnError("bad connection")
362 }
363
364 func (cn *badConn) Write([]byte) (int, error) {
365 if cn.writeDelay != 0 {
366 time.Sleep(cn.writeDelay)
367 }
368 if cn.writeErr != nil {
369 return 0, cn.writeErr
370 }
371 return 0, badConnError("bad connection")
372 }
373
374
375
376 type hook struct {
377 dialHook func(hook redis.DialHook) redis.DialHook
378 processHook func(hook redis.ProcessHook) redis.ProcessHook
379 processPipelineHook func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook
380 }
381
382 func (h *hook) DialHook(hook redis.DialHook) redis.DialHook {
383 if h.dialHook != nil {
384 return h.dialHook(hook)
385 }
386 return hook
387 }
388
389 func (h *hook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
390 if h.processHook != nil {
391 return h.processHook(hook)
392 }
393 return hook
394 }
395
396 func (h *hook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
397 if h.processPipelineHook != nil {
398 return h.processPipelineHook(hook)
399 }
400 return hook
401 }
402
View as plain text