1 package redis_test
2
3 import (
4 "io"
5 "net"
6 "sync"
7 "time"
8
9 . "github.com/bsm/ginkgo/v2"
10 . "github.com/bsm/gomega"
11
12 "github.com/redis/go-redis/v9"
13 )
14
15 var _ = Describe("PubSub", func() {
16 var client *redis.Client
17
18 BeforeEach(func() {
19 opt := redisOptions()
20 opt.MinIdleConns = 0
21 opt.ConnMaxLifetime = 0
22 client = redis.NewClient(opt)
23 Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
24 })
25
26 AfterEach(func() {
27 Expect(client.Close()).NotTo(HaveOccurred())
28 })
29
30 It("implements Stringer", func() {
31 pubsub := client.PSubscribe(ctx, "mychannel*")
32 defer pubsub.Close()
33
34 Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
35 })
36
37 It("should support pattern matching", func() {
38 pubsub := client.PSubscribe(ctx, "mychannel*")
39 defer pubsub.Close()
40
41 {
42 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
43 Expect(err).NotTo(HaveOccurred())
44 subscr := msgi.(*redis.Subscription)
45 Expect(subscr.Kind).To(Equal("psubscribe"))
46 Expect(subscr.Channel).To(Equal("mychannel*"))
47 Expect(subscr.Count).To(Equal(1))
48 }
49
50 {
51 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
52 Expect(err.(net.Error).Timeout()).To(Equal(true))
53 Expect(msgi).To(BeNil())
54 }
55
56 n, err := client.Publish(ctx, "mychannel1", "hello").Result()
57 Expect(err).NotTo(HaveOccurred())
58 Expect(n).To(Equal(int64(1)))
59
60 Expect(pubsub.PUnsubscribe(ctx, "mychannel*")).NotTo(HaveOccurred())
61
62 {
63 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
64 Expect(err).NotTo(HaveOccurred())
65 subscr := msgi.(*redis.Message)
66 Expect(subscr.Channel).To(Equal("mychannel1"))
67 Expect(subscr.Pattern).To(Equal("mychannel*"))
68 Expect(subscr.Payload).To(Equal("hello"))
69 }
70
71 {
72 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
73 Expect(err).NotTo(HaveOccurred())
74 subscr := msgi.(*redis.Subscription)
75 Expect(subscr.Kind).To(Equal("punsubscribe"))
76 Expect(subscr.Channel).To(Equal("mychannel*"))
77 Expect(subscr.Count).To(Equal(0))
78 }
79
80 stats := client.PoolStats()
81 Expect(stats.Misses).To(Equal(uint32(1)))
82 })
83
84 It("should pub/sub channels", func() {
85 channels, err := client.PubSubChannels(ctx, "mychannel*").Result()
86 Expect(err).NotTo(HaveOccurred())
87 Expect(channels).To(BeEmpty())
88
89 pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
90 defer pubsub.Close()
91
92
93 time.Sleep(10 * time.Millisecond)
94
95 channels, err = client.PubSubChannels(ctx, "mychannel*").Result()
96 Expect(err).NotTo(HaveOccurred())
97 Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
98
99 channels, err = client.PubSubChannels(ctx, "").Result()
100 Expect(err).NotTo(HaveOccurred())
101 Expect(channels).To(BeEmpty())
102
103 channels, err = client.PubSubChannels(ctx, "*").Result()
104 Expect(err).NotTo(HaveOccurred())
105 Expect(len(channels)).To(BeNumerically(">=", 2))
106 })
107
108 It("should sharded pub/sub channels", func() {
109 channels, err := client.PubSubShardChannels(ctx, "mychannel*").Result()
110 Expect(err).NotTo(HaveOccurred())
111 Expect(channels).To(BeEmpty())
112
113 pubsub := client.SSubscribe(ctx, "mychannel", "mychannel2")
114 defer pubsub.Close()
115
116
117 time.Sleep(10 * time.Millisecond)
118
119 channels, err = client.PubSubShardChannels(ctx, "mychannel*").Result()
120 Expect(err).NotTo(HaveOccurred())
121 Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
122
123 channels, err = client.PubSubShardChannels(ctx, "").Result()
124 Expect(err).NotTo(HaveOccurred())
125 Expect(channels).To(BeEmpty())
126
127 channels, err = client.PubSubShardChannels(ctx, "*").Result()
128 Expect(err).NotTo(HaveOccurred())
129 Expect(len(channels)).To(BeNumerically(">=", 2))
130
131 nums, err := client.PubSubShardNumSub(ctx, "mychannel", "mychannel2", "mychannel3").Result()
132 Expect(err).NotTo(HaveOccurred())
133 Expect(nums).To(Equal(map[string]int64{
134 "mychannel": 1,
135 "mychannel2": 1,
136 "mychannel3": 0,
137 }))
138 })
139
140 It("should return the numbers of subscribers", func() {
141 pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
142 defer pubsub.Close()
143
144
145 time.Sleep(10 * time.Millisecond)
146 channels, err := client.PubSubNumSub(ctx, "mychannel", "mychannel2", "mychannel3").Result()
147 Expect(err).NotTo(HaveOccurred())
148 Expect(channels).To(Equal(map[string]int64{
149 "mychannel": 1,
150 "mychannel2": 1,
151 "mychannel3": 0,
152 }))
153 })
154
155 It("should return the numbers of subscribers by pattern", func() {
156 num, err := client.PubSubNumPat(ctx).Result()
157 Expect(err).NotTo(HaveOccurred())
158 Expect(num).To(Equal(int64(0)))
159
160 pubsub := client.PSubscribe(ctx, "*")
161 defer pubsub.Close()
162
163
164 time.Sleep(10 * time.Millisecond)
165 num, err = client.PubSubNumPat(ctx).Result()
166 Expect(err).NotTo(HaveOccurred())
167 Expect(num).To(Equal(int64(1)))
168 })
169
170 It("should pub/sub", func() {
171 pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
172 defer pubsub.Close()
173
174 {
175 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
176 Expect(err).NotTo(HaveOccurred())
177 subscr := msgi.(*redis.Subscription)
178 Expect(subscr.Kind).To(Equal("subscribe"))
179 Expect(subscr.Channel).To(Equal("mychannel"))
180 Expect(subscr.Count).To(Equal(1))
181 }
182
183 {
184 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
185 Expect(err).NotTo(HaveOccurred())
186 subscr := msgi.(*redis.Subscription)
187 Expect(subscr.Kind).To(Equal("subscribe"))
188 Expect(subscr.Channel).To(Equal("mychannel2"))
189 Expect(subscr.Count).To(Equal(2))
190 }
191
192 {
193 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
194 Expect(err.(net.Error).Timeout()).To(Equal(true))
195 Expect(msgi).NotTo(HaveOccurred())
196 }
197
198 n, err := client.Publish(ctx, "mychannel", "hello").Result()
199 Expect(err).NotTo(HaveOccurred())
200 Expect(n).To(Equal(int64(1)))
201
202 n, err = client.Publish(ctx, "mychannel2", "hello2").Result()
203 Expect(err).NotTo(HaveOccurred())
204 Expect(n).To(Equal(int64(1)))
205
206 Expect(pubsub.Unsubscribe(ctx, "mychannel", "mychannel2")).NotTo(HaveOccurred())
207
208 {
209 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
210 Expect(err).NotTo(HaveOccurred())
211 msg := msgi.(*redis.Message)
212 Expect(msg.Channel).To(Equal("mychannel"))
213 Expect(msg.Payload).To(Equal("hello"))
214 }
215
216 {
217 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
218 Expect(err).NotTo(HaveOccurred())
219 msg := msgi.(*redis.Message)
220 Expect(msg.Channel).To(Equal("mychannel2"))
221 Expect(msg.Payload).To(Equal("hello2"))
222 }
223
224 {
225 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
226 Expect(err).NotTo(HaveOccurred())
227 subscr := msgi.(*redis.Subscription)
228 Expect(subscr.Kind).To(Equal("unsubscribe"))
229 Expect(subscr.Channel).To(Equal("mychannel"))
230 Expect(subscr.Count).To(Equal(1))
231 }
232
233 {
234 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
235 Expect(err).NotTo(HaveOccurred())
236 subscr := msgi.(*redis.Subscription)
237 Expect(subscr.Kind).To(Equal("unsubscribe"))
238 Expect(subscr.Channel).To(Equal("mychannel2"))
239 Expect(subscr.Count).To(Equal(0))
240 }
241
242 stats := client.PoolStats()
243 Expect(stats.Misses).To(Equal(uint32(1)))
244 })
245
246 It("should sharded pub/sub", func() {
247 pubsub := client.SSubscribe(ctx, "mychannel", "mychannel2")
248 defer pubsub.Close()
249
250 {
251 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
252 Expect(err).NotTo(HaveOccurred())
253 subscr := msgi.(*redis.Subscription)
254 Expect(subscr.Kind).To(Equal("ssubscribe"))
255 Expect(subscr.Channel).To(Equal("mychannel"))
256 Expect(subscr.Count).To(Equal(1))
257 }
258
259 {
260 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
261 Expect(err).NotTo(HaveOccurred())
262 subscr := msgi.(*redis.Subscription)
263 Expect(subscr.Kind).To(Equal("ssubscribe"))
264 Expect(subscr.Channel).To(Equal("mychannel2"))
265 Expect(subscr.Count).To(Equal(2))
266 }
267
268 {
269 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
270 Expect(err.(net.Error).Timeout()).To(Equal(true))
271 Expect(msgi).NotTo(HaveOccurred())
272 }
273
274 n, err := client.SPublish(ctx, "mychannel", "hello").Result()
275 Expect(err).NotTo(HaveOccurred())
276 Expect(n).To(Equal(int64(1)))
277
278 n, err = client.SPublish(ctx, "mychannel2", "hello2").Result()
279 Expect(err).NotTo(HaveOccurred())
280 Expect(n).To(Equal(int64(1)))
281
282 Expect(pubsub.SUnsubscribe(ctx, "mychannel", "mychannel2")).NotTo(HaveOccurred())
283
284 {
285 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
286 Expect(err).NotTo(HaveOccurred())
287 msg := msgi.(*redis.Message)
288 Expect(msg.Channel).To(Equal("mychannel"))
289 Expect(msg.Payload).To(Equal("hello"))
290 }
291
292 {
293 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
294 Expect(err).NotTo(HaveOccurred())
295 msg := msgi.(*redis.Message)
296 Expect(msg.Channel).To(Equal("mychannel2"))
297 Expect(msg.Payload).To(Equal("hello2"))
298 }
299
300 {
301 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
302 Expect(err).NotTo(HaveOccurred())
303 subscr := msgi.(*redis.Subscription)
304 Expect(subscr.Kind).To(Equal("sunsubscribe"))
305 Expect(subscr.Channel).To(Equal("mychannel"))
306 Expect(subscr.Count).To(Equal(1))
307 }
308
309 {
310 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
311 Expect(err).NotTo(HaveOccurred())
312 subscr := msgi.(*redis.Subscription)
313 Expect(subscr.Kind).To(Equal("sunsubscribe"))
314 Expect(subscr.Channel).To(Equal("mychannel2"))
315 Expect(subscr.Count).To(Equal(0))
316 }
317
318 stats := client.PoolStats()
319 Expect(stats.Misses).To(Equal(uint32(1)))
320 })
321
322 It("should ping/pong", func() {
323 pubsub := client.Subscribe(ctx, "mychannel")
324 defer pubsub.Close()
325
326 _, err := pubsub.ReceiveTimeout(ctx, time.Second)
327 Expect(err).NotTo(HaveOccurred())
328
329 err = pubsub.Ping(ctx, "")
330 Expect(err).NotTo(HaveOccurred())
331
332 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
333 Expect(err).NotTo(HaveOccurred())
334 pong := msgi.(*redis.Pong)
335 Expect(pong.Payload).To(Equal(""))
336 })
337
338 It("should ping/pong with payload", func() {
339 pubsub := client.Subscribe(ctx, "mychannel")
340 defer pubsub.Close()
341
342 _, err := pubsub.ReceiveTimeout(ctx, time.Second)
343 Expect(err).NotTo(HaveOccurred())
344
345 err = pubsub.Ping(ctx, "hello")
346 Expect(err).NotTo(HaveOccurred())
347
348 msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
349 Expect(err).NotTo(HaveOccurred())
350 pong := msgi.(*redis.Pong)
351 Expect(pong.Payload).To(Equal("hello"))
352 })
353
354 It("should multi-ReceiveMessage", func() {
355 pubsub := client.Subscribe(ctx, "mychannel")
356 defer pubsub.Close()
357
358 subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
359 Expect(err).NotTo(HaveOccurred())
360 Expect(subscr).To(Equal(&redis.Subscription{
361 Kind: "subscribe",
362 Channel: "mychannel",
363 Count: 1,
364 }))
365
366 err = client.Publish(ctx, "mychannel", "hello").Err()
367 Expect(err).NotTo(HaveOccurred())
368
369 err = client.Publish(ctx, "mychannel", "world").Err()
370 Expect(err).NotTo(HaveOccurred())
371
372 msg, err := pubsub.ReceiveMessage(ctx)
373 Expect(err).NotTo(HaveOccurred())
374 Expect(msg.Channel).To(Equal("mychannel"))
375 Expect(msg.Payload).To(Equal("hello"))
376
377 msg, err = pubsub.ReceiveMessage(ctx)
378 Expect(err).NotTo(HaveOccurred())
379 Expect(msg.Channel).To(Equal("mychannel"))
380 Expect(msg.Payload).To(Equal("world"))
381 })
382
383 It("returns an error when subscribe fails", func() {
384 pubsub := client.Subscribe(ctx)
385 defer pubsub.Close()
386
387 pubsub.SetNetConn(&badConn{
388 readErr: io.EOF,
389 writeErr: io.EOF,
390 })
391
392 err := pubsub.Subscribe(ctx, "mychannel")
393 Expect(err).To(MatchError("EOF"))
394
395 err = pubsub.Subscribe(ctx, "mychannel")
396 Expect(err).NotTo(HaveOccurred())
397 })
398
399 expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
400 pubsub.SetNetConn(&badConn{
401 readErr: io.EOF,
402 writeErr: io.EOF,
403 })
404
405 step := make(chan struct{}, 3)
406
407 go func() {
408 defer GinkgoRecover()
409
410 Eventually(step).Should(Receive())
411 err := client.Publish(ctx, "mychannel", "hello").Err()
412 Expect(err).NotTo(HaveOccurred())
413 step <- struct{}{}
414 }()
415
416 _, err := pubsub.ReceiveMessage(ctx)
417 Expect(err).To(Equal(io.EOF))
418 step <- struct{}{}
419
420 msg, err := pubsub.ReceiveMessage(ctx)
421 Expect(err).NotTo(HaveOccurred())
422 Expect(msg.Channel).To(Equal("mychannel"))
423 Expect(msg.Payload).To(Equal("hello"))
424
425 Eventually(step).Should(Receive())
426 }
427
428 It("Subscribe should reconnect on ReceiveMessage error", func() {
429 pubsub := client.Subscribe(ctx, "mychannel")
430 defer pubsub.Close()
431
432 subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
433 Expect(err).NotTo(HaveOccurred())
434 Expect(subscr).To(Equal(&redis.Subscription{
435 Kind: "subscribe",
436 Channel: "mychannel",
437 Count: 1,
438 }))
439
440 expectReceiveMessageOnError(pubsub)
441 })
442
443 It("PSubscribe should reconnect on ReceiveMessage error", func() {
444 pubsub := client.PSubscribe(ctx, "mychannel")
445 defer pubsub.Close()
446
447 subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
448 Expect(err).NotTo(HaveOccurred())
449 Expect(subscr).To(Equal(&redis.Subscription{
450 Kind: "psubscribe",
451 Channel: "mychannel",
452 Count: 1,
453 }))
454
455 expectReceiveMessageOnError(pubsub)
456 })
457
458 It("should return on Close", func() {
459 pubsub := client.Subscribe(ctx, "mychannel")
460 defer pubsub.Close()
461
462 var wg sync.WaitGroup
463 wg.Add(1)
464 go func() {
465 defer GinkgoRecover()
466
467 wg.Done()
468 defer wg.Done()
469
470 _, err := pubsub.ReceiveMessage(ctx)
471 Expect(err).To(HaveOccurred())
472 Expect(err.Error()).To(SatisfyAny(
473 Equal("redis: client is closed"),
474 ContainSubstring("use of closed network connection"),
475 ))
476 }()
477
478 wg.Wait()
479 wg.Add(1)
480
481 Expect(pubsub.Close()).NotTo(HaveOccurred())
482
483 wg.Wait()
484 })
485
486 It("should ReceiveMessage without a subscription", func() {
487 timeout := 100 * time.Millisecond
488
489 pubsub := client.Subscribe(ctx)
490 defer pubsub.Close()
491
492 var wg sync.WaitGroup
493 wg.Add(1)
494 go func() {
495 defer GinkgoRecover()
496 defer wg.Done()
497
498 time.Sleep(timeout)
499
500 err := pubsub.Subscribe(ctx, "mychannel")
501 Expect(err).NotTo(HaveOccurred())
502
503 time.Sleep(timeout)
504
505 err = client.Publish(ctx, "mychannel", "hello").Err()
506 Expect(err).NotTo(HaveOccurred())
507 }()
508
509 msg, err := pubsub.ReceiveMessage(ctx)
510 Expect(err).NotTo(HaveOccurred())
511 Expect(msg.Channel).To(Equal("mychannel"))
512 Expect(msg.Payload).To(Equal("hello"))
513
514 wg.Wait()
515 })
516
517 It("handles big message payload", func() {
518 pubsub := client.Subscribe(ctx, "mychannel")
519 defer pubsub.Close()
520
521 ch := pubsub.Channel()
522
523 bigVal := bigVal()
524 err := client.Publish(ctx, "mychannel", bigVal).Err()
525 Expect(err).NotTo(HaveOccurred())
526
527 var msg *redis.Message
528 Eventually(ch).Should(Receive(&msg))
529 Expect(msg.Channel).To(Equal("mychannel"))
530 Expect(msg.Payload).To(Equal(string(bigVal)))
531 })
532
533 It("supports concurrent Ping and Receive", func() {
534 const N = 100
535
536 pubsub := client.Subscribe(ctx, "mychannel")
537 defer pubsub.Close()
538
539 done := make(chan struct{})
540 go func() {
541 defer GinkgoRecover()
542
543 for i := 0; i < N; i++ {
544 _, err := pubsub.ReceiveTimeout(ctx, 5*time.Second)
545 Expect(err).NotTo(HaveOccurred())
546 }
547 close(done)
548 }()
549
550 for i := 0; i < N; i++ {
551 err := pubsub.Ping(ctx)
552 Expect(err).NotTo(HaveOccurred())
553 }
554
555 select {
556 case <-done:
557 case <-time.After(30 * time.Second):
558 Fail("timeout")
559 }
560 })
561
562 It("should ChannelMessage", func() {
563 pubsub := client.Subscribe(ctx, "mychannel")
564 defer pubsub.Close()
565
566 ch := pubsub.Channel(
567 redis.WithChannelSize(10),
568 redis.WithChannelHealthCheckInterval(time.Second),
569 )
570
571 text := "test channel message"
572 err := client.Publish(ctx, "mychannel", text).Err()
573 Expect(err).NotTo(HaveOccurred())
574
575 var msg *redis.Message
576 Eventually(ch).Should(Receive(&msg))
577 Expect(msg.Channel).To(Equal("mychannel"))
578 Expect(msg.Payload).To(Equal(text))
579 })
580 })
581
View as plain text