1 package redis_test
2
3 import (
4 "errors"
5 "strconv"
6
7 . "github.com/bsm/ginkgo/v2"
8 . "github.com/bsm/gomega"
9
10 "github.com/redis/go-redis/v9"
11 )
12
13 var _ = Describe("pipelining", func() {
14 var client *redis.Client
15 var pipe *redis.Pipeline
16
17 BeforeEach(func() {
18 client = redis.NewClient(redisOptions())
19 Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
20 })
21
22 AfterEach(func() {
23 Expect(client.Close()).NotTo(HaveOccurred())
24 })
25
26 It("supports block style", func() {
27 var get *redis.StringCmd
28 cmds, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
29 get = pipe.Get(ctx, "foo")
30 return nil
31 })
32 Expect(err).To(Equal(redis.Nil))
33 Expect(cmds).To(HaveLen(1))
34 Expect(cmds[0]).To(Equal(get))
35 Expect(get.Err()).To(Equal(redis.Nil))
36 Expect(get.Val()).To(Equal(""))
37 })
38
39 It("exports queued commands", func() {
40 p := client.Pipeline()
41 cmds := p.Cmds()
42 Expect(cmds).To(BeEmpty())
43
44 p.Set(ctx, "foo", "bar", 0)
45 p.Get(ctx, "foo")
46 cmds = p.Cmds()
47 Expect(cmds).To(HaveLen(p.Len()))
48 Expect(cmds[0].Name()).To(Equal("set"))
49 Expect(cmds[1].Name()).To(Equal("get"))
50
51 cmds, err := p.Exec(ctx)
52 Expect(err).NotTo(HaveOccurred())
53 Expect(cmds).To(HaveLen(2))
54 Expect(cmds[0].Name()).To(Equal("set"))
55 Expect(cmds[0].(*redis.StatusCmd).Val()).To(Equal("OK"))
56 Expect(cmds[1].Name()).To(Equal("get"))
57 Expect(cmds[1].(*redis.StringCmd).Val()).To(Equal("bar"))
58
59 cmds = p.Cmds()
60 Expect(cmds).To(BeEmpty())
61 })
62
63 assertPipeline := func() {
64 It("returns no errors when there are no commands", func() {
65 _, err := pipe.Exec(ctx)
66 Expect(err).NotTo(HaveOccurred())
67 })
68
69 It("discards queued commands", func() {
70 pipe.Get(ctx, "key")
71 pipe.Discard()
72 cmds, err := pipe.Exec(ctx)
73 Expect(err).NotTo(HaveOccurred())
74 Expect(cmds).To(BeNil())
75 })
76
77 It("handles val/err", func() {
78 err := client.Set(ctx, "key", "value", 0).Err()
79 Expect(err).NotTo(HaveOccurred())
80
81 get := pipe.Get(ctx, "key")
82 cmds, err := pipe.Exec(ctx)
83 Expect(err).NotTo(HaveOccurred())
84 Expect(cmds).To(HaveLen(1))
85
86 val, err := get.Result()
87 Expect(err).NotTo(HaveOccurred())
88 Expect(val).To(Equal("value"))
89 })
90
91 It("supports custom command", func() {
92 pipe.Do(ctx, "ping")
93 cmds, err := pipe.Exec(ctx)
94 Expect(err).NotTo(HaveOccurred())
95 Expect(cmds).To(HaveLen(1))
96 })
97
98 It("handles large pipelines", Label("NonRedisEnterprise"), func() {
99 for callCount := 1; callCount < 16; callCount++ {
100 for i := 1; i <= callCount; i++ {
101 pipe.SetNX(ctx, strconv.Itoa(i)+"_key", strconv.Itoa(i)+"_value", 0)
102 }
103
104 cmds, err := pipe.Exec(ctx)
105 Expect(err).NotTo(HaveOccurred())
106 Expect(cmds).To(HaveLen(callCount))
107 for _, cmd := range cmds {
108 Expect(cmd).To(BeAssignableToTypeOf(&redis.BoolCmd{}))
109 }
110 }
111 })
112
113 It("should Exec, not Do", func() {
114 err := pipe.Do(ctx).Err()
115 Expect(err).To(Equal(errors.New("redis: please enter the command to be executed")))
116 })
117
118 It("should process", func() {
119 err := pipe.Process(ctx, redis.NewCmd(ctx, "asking"))
120 Expect(err).To(BeNil())
121 Expect(pipe.Cmds()).To(HaveLen(1))
122 })
123
124 It("should batchProcess", func() {
125 err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"))
126 Expect(err).To(BeNil())
127 Expect(pipe.Cmds()).To(HaveLen(1))
128
129 pipe.Discard()
130 Expect(pipe.Cmds()).To(HaveLen(0))
131
132 err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value"))
133 Expect(err).To(BeNil())
134 Expect(pipe.Cmds()).To(HaveLen(2))
135 })
136 }
137
138 Describe("Pipeline", func() {
139 BeforeEach(func() {
140 pipe = client.Pipeline().(*redis.Pipeline)
141 })
142
143 assertPipeline()
144 })
145
146 Describe("TxPipeline", func() {
147 BeforeEach(func() {
148 pipe = client.TxPipeline().(*redis.Pipeline)
149 })
150
151 assertPipeline()
152 })
153 })
154
View as plain text