1 package pool_test
2
3 import (
4 "context"
5 "net"
6 "sync"
7 "testing"
8 "time"
9
10 . "github.com/bsm/ginkgo/v2"
11 . "github.com/bsm/gomega"
12
13 "github.com/redis/go-redis/v9/internal/pool"
14 )
15
16 var _ = Describe("ConnPool", func() {
17 ctx := context.Background()
18 var connPool *pool.ConnPool
19
20 BeforeEach(func() {
21 connPool = pool.NewConnPool(&pool.Options{
22 Dialer: dummyDialer,
23 PoolSize: 10,
24 PoolTimeout: time.Hour,
25 DialTimeout: 1 * time.Second,
26 ConnMaxIdleTime: time.Millisecond,
27 })
28 })
29
30 AfterEach(func() {
31 connPool.Close()
32 })
33
34 It("should safe close", func() {
35 const minIdleConns = 10
36
37 var (
38 wg sync.WaitGroup
39 closedChan = make(chan struct{})
40 )
41 wg.Add(minIdleConns)
42 connPool = pool.NewConnPool(&pool.Options{
43 Dialer: func(ctx context.Context) (net.Conn, error) {
44 wg.Done()
45 <-closedChan
46 return &net.TCPConn{}, nil
47 },
48 PoolSize: 10,
49 PoolTimeout: time.Hour,
50 DialTimeout: 1 * time.Second,
51 ConnMaxIdleTime: time.Millisecond,
52 MinIdleConns: minIdleConns,
53 })
54 wg.Wait()
55 Expect(connPool.Close()).NotTo(HaveOccurred())
56 close(closedChan)
57
58
59 time.Sleep(time.Second)
60
61 Expect(connPool.Stats()).To(Equal(&pool.Stats{
62 Hits: 0,
63 Misses: 0,
64 Timeouts: 0,
65 WaitCount: 0,
66 WaitDurationNs: 0,
67 TotalConns: 0,
68 IdleConns: 0,
69 StaleConns: 0,
70 }))
71 })
72
73 It("should unblock client when conn is removed", func() {
74
75 cn, err := connPool.Get(ctx)
76 Expect(err).NotTo(HaveOccurred())
77
78
79 var cns []*pool.Conn
80 for i := 0; i < 9; i++ {
81 cn, err := connPool.Get(ctx)
82 Expect(err).NotTo(HaveOccurred())
83 cns = append(cns, cn)
84 }
85
86 started := make(chan bool, 1)
87 done := make(chan bool, 1)
88 go func() {
89 defer GinkgoRecover()
90
91 started <- true
92 _, err := connPool.Get(ctx)
93 Expect(err).NotTo(HaveOccurred())
94 done <- true
95
96 connPool.Put(ctx, cn)
97 }()
98 <-started
99
100
101 select {
102 case <-done:
103 Fail("Get is not blocked")
104 case <-time.After(time.Millisecond):
105
106 }
107
108 connPool.Remove(ctx, cn, nil)
109
110
111 select {
112 case <-done:
113
114 case <-time.After(time.Second):
115 Fail("Get is not unblocked")
116 }
117
118 for _, cn := range cns {
119 connPool.Put(ctx, cn)
120 }
121 })
122 })
123
124 var _ = Describe("MinIdleConns", func() {
125 const poolSize = 100
126 ctx := context.Background()
127 var minIdleConns int
128 var connPool *pool.ConnPool
129
130 newConnPool := func() *pool.ConnPool {
131 connPool := pool.NewConnPool(&pool.Options{
132 Dialer: dummyDialer,
133 PoolSize: poolSize,
134 MinIdleConns: minIdleConns,
135 PoolTimeout: 100 * time.Millisecond,
136 DialTimeout: 1 * time.Second,
137 ConnMaxIdleTime: -1,
138 })
139 Eventually(func() int {
140 return connPool.Len()
141 }).Should(Equal(minIdleConns))
142 return connPool
143 }
144
145 assert := func() {
146 It("has idle connections when created", func() {
147 Expect(connPool.Len()).To(Equal(minIdleConns))
148 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
149 })
150
151 Context("after Get", func() {
152 var cn *pool.Conn
153
154 BeforeEach(func() {
155 var err error
156 cn, err = connPool.Get(ctx)
157 Expect(err).NotTo(HaveOccurred())
158
159 Eventually(func() int {
160 return connPool.Len()
161 }).Should(Equal(minIdleConns + 1))
162 })
163
164 It("has idle connections", func() {
165 Expect(connPool.Len()).To(Equal(minIdleConns + 1))
166 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
167 })
168
169 Context("after Remove", func() {
170 BeforeEach(func() {
171 connPool.Remove(ctx, cn, nil)
172 })
173
174 It("has idle connections", func() {
175 Expect(connPool.Len()).To(Equal(minIdleConns))
176 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
177 })
178 })
179 })
180
181 Describe("Get does not exceed pool size", func() {
182 var mu sync.RWMutex
183 var cns []*pool.Conn
184
185 BeforeEach(func() {
186 cns = make([]*pool.Conn, 0)
187
188 perform(poolSize, func(_ int) {
189 defer GinkgoRecover()
190
191 cn, err := connPool.Get(ctx)
192 Expect(err).NotTo(HaveOccurred())
193 mu.Lock()
194 cns = append(cns, cn)
195 mu.Unlock()
196 })
197
198 Eventually(func() int {
199 return connPool.Len()
200 }).Should(BeNumerically(">=", poolSize))
201 })
202
203 It("Get is blocked", func() {
204 done := make(chan struct{})
205 go func() {
206 connPool.Get(ctx)
207 close(done)
208 }()
209
210 select {
211 case <-done:
212 Fail("Get is not blocked")
213 case <-time.After(time.Millisecond):
214
215 }
216
217 select {
218 case <-done:
219
220 case <-time.After(time.Second):
221 Fail("Get is not unblocked")
222 }
223 })
224
225 Context("after Put", func() {
226 BeforeEach(func() {
227 perform(len(cns), func(i int) {
228 mu.RLock()
229 connPool.Put(ctx, cns[i])
230 mu.RUnlock()
231 })
232
233 Eventually(func() int {
234 return connPool.Len()
235 }).Should(Equal(poolSize))
236 })
237
238 It("pool.Len is back to normal", func() {
239 Expect(connPool.Len()).To(Equal(poolSize))
240 Expect(connPool.IdleLen()).To(Equal(poolSize))
241 })
242 })
243
244 Context("after Remove", func() {
245 BeforeEach(func() {
246 perform(len(cns), func(i int) {
247 mu.RLock()
248 connPool.Remove(ctx, cns[i], nil)
249 mu.RUnlock()
250 })
251
252 Eventually(func() int {
253 return connPool.Len()
254 }).Should(Equal(minIdleConns))
255 })
256
257 It("has idle connections", func() {
258 Expect(connPool.Len()).To(Equal(minIdleConns))
259 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
260 })
261 })
262 })
263 }
264
265 Context("minIdleConns = 1", func() {
266 BeforeEach(func() {
267 minIdleConns = 1
268 connPool = newConnPool()
269 })
270
271 AfterEach(func() {
272 connPool.Close()
273 })
274
275 assert()
276 })
277
278 Context("minIdleConns = 32", func() {
279 BeforeEach(func() {
280 minIdleConns = 32
281 connPool = newConnPool()
282 })
283
284 AfterEach(func() {
285 connPool.Close()
286 })
287
288 assert()
289 })
290 })
291
292 var _ = Describe("race", func() {
293 ctx := context.Background()
294 var connPool *pool.ConnPool
295 var C, N int
296
297 BeforeEach(func() {
298 C, N = 10, 1000
299 if testing.Short() {
300 C = 2
301 N = 50
302 }
303 })
304
305 AfterEach(func() {
306 connPool.Close()
307 })
308
309 It("does not happen on Get, Put, and Remove", func() {
310 connPool = pool.NewConnPool(&pool.Options{
311 Dialer: dummyDialer,
312 PoolSize: 10,
313 PoolTimeout: time.Minute,
314 DialTimeout: 1 * time.Second,
315 ConnMaxIdleTime: time.Millisecond,
316 })
317
318 perform(C, func(id int) {
319 for i := 0; i < N; i++ {
320 cn, err := connPool.Get(ctx)
321 Expect(err).NotTo(HaveOccurred())
322 if err == nil {
323 connPool.Put(ctx, cn)
324 }
325 }
326 }, func(id int) {
327 for i := 0; i < N; i++ {
328 cn, err := connPool.Get(ctx)
329 Expect(err).NotTo(HaveOccurred())
330 if err == nil {
331 connPool.Remove(ctx, cn, nil)
332 }
333 }
334 })
335 })
336
337 It("limit the number of connections", func() {
338 opt := &pool.Options{
339 Dialer: func(ctx context.Context) (net.Conn, error) {
340 return &net.TCPConn{}, nil
341 },
342 PoolSize: 1000,
343 MinIdleConns: 50,
344 PoolTimeout: 3 * time.Second,
345 DialTimeout: 1 * time.Second,
346 }
347 p := pool.NewConnPool(opt)
348
349 var wg sync.WaitGroup
350 for i := 0; i < opt.PoolSize; i++ {
351 wg.Add(1)
352 go func() {
353 defer wg.Done()
354 _, _ = p.Get(ctx)
355 }()
356 }
357 wg.Wait()
358
359 stats := p.Stats()
360 Expect(stats.IdleConns).To(Equal(uint32(0)))
361 Expect(stats.TotalConns).To(Equal(uint32(opt.PoolSize)))
362 })
363
364 It("recover addIdleConn panic", func() {
365 opt := &pool.Options{
366 Dialer: func(ctx context.Context) (net.Conn, error) {
367 panic("test panic")
368 },
369 PoolSize: 100,
370 MinIdleConns: 30,
371 }
372 p := pool.NewConnPool(opt)
373
374 p.CheckMinIdleConns()
375
376 Eventually(func() bool {
377 state := p.Stats()
378 return state.TotalConns == 0 && state.IdleConns == 0 && p.QueueLen() == 0
379 }, "3s", "50ms").Should(BeTrue())
380 })
381
382 It("wait", func() {
383 opt := &pool.Options{
384 Dialer: func(ctx context.Context) (net.Conn, error) {
385 return &net.TCPConn{}, nil
386 },
387 PoolSize: 1,
388 PoolTimeout: 3 * time.Second,
389 }
390 p := pool.NewConnPool(opt)
391
392 wait := make(chan struct{})
393 conn, _ := p.Get(ctx)
394 go func() {
395 _, _ = p.Get(ctx)
396 wait <- struct{}{}
397 }()
398 time.Sleep(time.Second)
399 p.Put(ctx, conn)
400 <-wait
401
402 stats := p.Stats()
403 Expect(stats.IdleConns).To(Equal(uint32(0)))
404 Expect(stats.TotalConns).To(Equal(uint32(1)))
405 Expect(stats.WaitCount).To(Equal(uint32(1)))
406 Expect(stats.WaitDurationNs).To(BeNumerically("~", time.Second.Nanoseconds(), 100*time.Millisecond.Nanoseconds()))
407 })
408
409 It("timeout", func() {
410 testPoolTimeout := 1 * time.Second
411 opt := &pool.Options{
412 Dialer: func(ctx context.Context) (net.Conn, error) {
413
414 time.Sleep(3 * testPoolTimeout)
415
416 return &net.TCPConn{}, nil
417 },
418 PoolSize: 1,
419 PoolTimeout: testPoolTimeout,
420 }
421 p := pool.NewConnPool(opt)
422
423 stats := p.Stats()
424 Expect(stats.Timeouts).To(Equal(uint32(0)))
425
426 conn, err := p.Get(ctx)
427 Expect(err).NotTo(HaveOccurred())
428 _, err = p.Get(ctx)
429 Expect(err).To(MatchError(pool.ErrPoolTimeout))
430 p.Put(ctx, conn)
431 _, err = p.Get(ctx)
432 Expect(err).NotTo(HaveOccurred())
433
434 stats = p.Stats()
435 Expect(stats.Timeouts).To(Equal(uint32(1)))
436 })
437 })
438
View as plain text