...

Source file src/github.com/redis/go-redis/v9/internal/pool/pool_test.go

Documentation: github.com/redis/go-redis/v9/internal/pool

     1  package pool_test
     2  
     3  import (
     4  	"context"
     5  	"net"
     6  	"sync"
     7  	"testing"
     8  	"time"
     9  
    10  	. "github.com/bsm/ginkgo/v2"
    11  	. "github.com/bsm/gomega"
    12  
    13  	"github.com/redis/go-redis/v9/internal/pool"
    14  )
    15  
    16  var _ = Describe("ConnPool", func() {
    17  	ctx := context.Background()
    18  	var connPool *pool.ConnPool
    19  
    20  	BeforeEach(func() {
    21  		connPool = pool.NewConnPool(&pool.Options{
    22  			Dialer:          dummyDialer,
    23  			PoolSize:        10,
    24  			PoolTimeout:     time.Hour,
    25  			DialTimeout:     1 * time.Second,
    26  			ConnMaxIdleTime: time.Millisecond,
    27  		})
    28  	})
    29  
    30  	AfterEach(func() {
    31  		connPool.Close()
    32  	})
    33  
    34  	It("should safe close", func() {
    35  		const minIdleConns = 10
    36  
    37  		var (
    38  			wg         sync.WaitGroup
    39  			closedChan = make(chan struct{})
    40  		)
    41  		wg.Add(minIdleConns)
    42  		connPool = pool.NewConnPool(&pool.Options{
    43  			Dialer: func(ctx context.Context) (net.Conn, error) {
    44  				wg.Done()
    45  				<-closedChan
    46  				return &net.TCPConn{}, nil
    47  			},
    48  			PoolSize:        10,
    49  			PoolTimeout:     time.Hour,
    50  			DialTimeout:     1 * time.Second,
    51  			ConnMaxIdleTime: time.Millisecond,
    52  			MinIdleConns:    minIdleConns,
    53  		})
    54  		wg.Wait()
    55  		Expect(connPool.Close()).NotTo(HaveOccurred())
    56  		close(closedChan)
    57  
    58  		// We wait for 1 second and believe that checkMinIdleConns has been executed.
    59  		time.Sleep(time.Second)
    60  
    61  		Expect(connPool.Stats()).To(Equal(&pool.Stats{
    62  			Hits:           0,
    63  			Misses:         0,
    64  			Timeouts:       0,
    65  			WaitCount:      0,
    66  			WaitDurationNs: 0,
    67  			TotalConns:     0,
    68  			IdleConns:      0,
    69  			StaleConns:     0,
    70  		}))
    71  	})
    72  
    73  	It("should unblock client when conn is removed", func() {
    74  		// Reserve one connection.
    75  		cn, err := connPool.Get(ctx)
    76  		Expect(err).NotTo(HaveOccurred())
    77  
    78  		// Reserve all other connections.
    79  		var cns []*pool.Conn
    80  		for i := 0; i < 9; i++ {
    81  			cn, err := connPool.Get(ctx)
    82  			Expect(err).NotTo(HaveOccurred())
    83  			cns = append(cns, cn)
    84  		}
    85  
    86  		started := make(chan bool, 1)
    87  		done := make(chan bool, 1)
    88  		go func() {
    89  			defer GinkgoRecover()
    90  
    91  			started <- true
    92  			_, err := connPool.Get(ctx)
    93  			Expect(err).NotTo(HaveOccurred())
    94  			done <- true
    95  
    96  			connPool.Put(ctx, cn)
    97  		}()
    98  		<-started
    99  
   100  		// Check that Get is blocked.
   101  		select {
   102  		case <-done:
   103  			Fail("Get is not blocked")
   104  		case <-time.After(time.Millisecond):
   105  			// ok
   106  		}
   107  
   108  		connPool.Remove(ctx, cn, nil)
   109  
   110  		// Check that Get is unblocked.
   111  		select {
   112  		case <-done:
   113  			// ok
   114  		case <-time.After(time.Second):
   115  			Fail("Get is not unblocked")
   116  		}
   117  
   118  		for _, cn := range cns {
   119  			connPool.Put(ctx, cn)
   120  		}
   121  	})
   122  })
   123  
   124  var _ = Describe("MinIdleConns", func() {
   125  	const poolSize = 100
   126  	ctx := context.Background()
   127  	var minIdleConns int
   128  	var connPool *pool.ConnPool
   129  
   130  	newConnPool := func() *pool.ConnPool {
   131  		connPool := pool.NewConnPool(&pool.Options{
   132  			Dialer:          dummyDialer,
   133  			PoolSize:        poolSize,
   134  			MinIdleConns:    minIdleConns,
   135  			PoolTimeout:     100 * time.Millisecond,
   136  			DialTimeout:     1 * time.Second,
   137  			ConnMaxIdleTime: -1,
   138  		})
   139  		Eventually(func() int {
   140  			return connPool.Len()
   141  		}).Should(Equal(minIdleConns))
   142  		return connPool
   143  	}
   144  
   145  	assert := func() {
   146  		It("has idle connections when created", func() {
   147  			Expect(connPool.Len()).To(Equal(minIdleConns))
   148  			Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   149  		})
   150  
   151  		Context("after Get", func() {
   152  			var cn *pool.Conn
   153  
   154  			BeforeEach(func() {
   155  				var err error
   156  				cn, err = connPool.Get(ctx)
   157  				Expect(err).NotTo(HaveOccurred())
   158  
   159  				Eventually(func() int {
   160  					return connPool.Len()
   161  				}).Should(Equal(minIdleConns + 1))
   162  			})
   163  
   164  			It("has idle connections", func() {
   165  				Expect(connPool.Len()).To(Equal(minIdleConns + 1))
   166  				Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   167  			})
   168  
   169  			Context("after Remove", func() {
   170  				BeforeEach(func() {
   171  					connPool.Remove(ctx, cn, nil)
   172  				})
   173  
   174  				It("has idle connections", func() {
   175  					Expect(connPool.Len()).To(Equal(minIdleConns))
   176  					Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   177  				})
   178  			})
   179  		})
   180  
   181  		Describe("Get does not exceed pool size", func() {
   182  			var mu sync.RWMutex
   183  			var cns []*pool.Conn
   184  
   185  			BeforeEach(func() {
   186  				cns = make([]*pool.Conn, 0)
   187  
   188  				perform(poolSize, func(_ int) {
   189  					defer GinkgoRecover()
   190  
   191  					cn, err := connPool.Get(ctx)
   192  					Expect(err).NotTo(HaveOccurred())
   193  					mu.Lock()
   194  					cns = append(cns, cn)
   195  					mu.Unlock()
   196  				})
   197  
   198  				Eventually(func() int {
   199  					return connPool.Len()
   200  				}).Should(BeNumerically(">=", poolSize))
   201  			})
   202  
   203  			It("Get is blocked", func() {
   204  				done := make(chan struct{})
   205  				go func() {
   206  					connPool.Get(ctx)
   207  					close(done)
   208  				}()
   209  
   210  				select {
   211  				case <-done:
   212  					Fail("Get is not blocked")
   213  				case <-time.After(time.Millisecond):
   214  					// ok
   215  				}
   216  
   217  				select {
   218  				case <-done:
   219  					// ok
   220  				case <-time.After(time.Second):
   221  					Fail("Get is not unblocked")
   222  				}
   223  			})
   224  
   225  			Context("after Put", func() {
   226  				BeforeEach(func() {
   227  					perform(len(cns), func(i int) {
   228  						mu.RLock()
   229  						connPool.Put(ctx, cns[i])
   230  						mu.RUnlock()
   231  					})
   232  
   233  					Eventually(func() int {
   234  						return connPool.Len()
   235  					}).Should(Equal(poolSize))
   236  				})
   237  
   238  				It("pool.Len is back to normal", func() {
   239  					Expect(connPool.Len()).To(Equal(poolSize))
   240  					Expect(connPool.IdleLen()).To(Equal(poolSize))
   241  				})
   242  			})
   243  
   244  			Context("after Remove", func() {
   245  				BeforeEach(func() {
   246  					perform(len(cns), func(i int) {
   247  						mu.RLock()
   248  						connPool.Remove(ctx, cns[i], nil)
   249  						mu.RUnlock()
   250  					})
   251  
   252  					Eventually(func() int {
   253  						return connPool.Len()
   254  					}).Should(Equal(minIdleConns))
   255  				})
   256  
   257  				It("has idle connections", func() {
   258  					Expect(connPool.Len()).To(Equal(minIdleConns))
   259  					Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   260  				})
   261  			})
   262  		})
   263  	}
   264  
   265  	Context("minIdleConns = 1", func() {
   266  		BeforeEach(func() {
   267  			minIdleConns = 1
   268  			connPool = newConnPool()
   269  		})
   270  
   271  		AfterEach(func() {
   272  			connPool.Close()
   273  		})
   274  
   275  		assert()
   276  	})
   277  
   278  	Context("minIdleConns = 32", func() {
   279  		BeforeEach(func() {
   280  			minIdleConns = 32
   281  			connPool = newConnPool()
   282  		})
   283  
   284  		AfterEach(func() {
   285  			connPool.Close()
   286  		})
   287  
   288  		assert()
   289  	})
   290  })
   291  
   292  var _ = Describe("race", func() {
   293  	ctx := context.Background()
   294  	var connPool *pool.ConnPool
   295  	var C, N int
   296  
   297  	BeforeEach(func() {
   298  		C, N = 10, 1000
   299  		if testing.Short() {
   300  			C = 2
   301  			N = 50
   302  		}
   303  	})
   304  
   305  	AfterEach(func() {
   306  		connPool.Close()
   307  	})
   308  
   309  	It("does not happen on Get, Put, and Remove", func() {
   310  		connPool = pool.NewConnPool(&pool.Options{
   311  			Dialer:          dummyDialer,
   312  			PoolSize:        10,
   313  			PoolTimeout:     time.Minute,
   314  			DialTimeout:     1 * time.Second,
   315  			ConnMaxIdleTime: time.Millisecond,
   316  		})
   317  
   318  		perform(C, func(id int) {
   319  			for i := 0; i < N; i++ {
   320  				cn, err := connPool.Get(ctx)
   321  				Expect(err).NotTo(HaveOccurred())
   322  				if err == nil {
   323  					connPool.Put(ctx, cn)
   324  				}
   325  			}
   326  		}, func(id int) {
   327  			for i := 0; i < N; i++ {
   328  				cn, err := connPool.Get(ctx)
   329  				Expect(err).NotTo(HaveOccurred())
   330  				if err == nil {
   331  					connPool.Remove(ctx, cn, nil)
   332  				}
   333  			}
   334  		})
   335  	})
   336  
   337  	It("limit the number of connections", func() {
   338  		opt := &pool.Options{
   339  			Dialer: func(ctx context.Context) (net.Conn, error) {
   340  				return &net.TCPConn{}, nil
   341  			},
   342  			PoolSize:     1000,
   343  			MinIdleConns: 50,
   344  			PoolTimeout:  3 * time.Second,
   345  			DialTimeout:  1 * time.Second,
   346  		}
   347  		p := pool.NewConnPool(opt)
   348  
   349  		var wg sync.WaitGroup
   350  		for i := 0; i < opt.PoolSize; i++ {
   351  			wg.Add(1)
   352  			go func() {
   353  				defer wg.Done()
   354  				_, _ = p.Get(ctx)
   355  			}()
   356  		}
   357  		wg.Wait()
   358  
   359  		stats := p.Stats()
   360  		Expect(stats.IdleConns).To(Equal(uint32(0)))
   361  		Expect(stats.TotalConns).To(Equal(uint32(opt.PoolSize)))
   362  	})
   363  
   364  	It("recover addIdleConn panic", func() {
   365  		opt := &pool.Options{
   366  			Dialer: func(ctx context.Context) (net.Conn, error) {
   367  				panic("test panic")
   368  			},
   369  			PoolSize:     100,
   370  			MinIdleConns: 30,
   371  		}
   372  		p := pool.NewConnPool(opt)
   373  
   374  		p.CheckMinIdleConns()
   375  
   376  		Eventually(func() bool {
   377  			state := p.Stats()
   378  			return state.TotalConns == 0 && state.IdleConns == 0 && p.QueueLen() == 0
   379  		}, "3s", "50ms").Should(BeTrue())
   380    })
   381    
   382  	It("wait", func() {
   383  		opt := &pool.Options{
   384  			Dialer: func(ctx context.Context) (net.Conn, error) {
   385  				return &net.TCPConn{}, nil
   386  			},
   387  			PoolSize:    1,
   388  			PoolTimeout: 3 * time.Second,
   389  		}
   390  		p := pool.NewConnPool(opt)
   391  
   392  		wait := make(chan struct{})
   393  		conn, _ := p.Get(ctx)
   394  		go func() {
   395  			_, _ = p.Get(ctx)
   396  			wait <- struct{}{}
   397  		}()
   398  		time.Sleep(time.Second)
   399  		p.Put(ctx, conn)
   400  		<-wait
   401  
   402  		stats := p.Stats()
   403  		Expect(stats.IdleConns).To(Equal(uint32(0)))
   404  		Expect(stats.TotalConns).To(Equal(uint32(1)))
   405  		Expect(stats.WaitCount).To(Equal(uint32(1)))
   406  		Expect(stats.WaitDurationNs).To(BeNumerically("~", time.Second.Nanoseconds(), 100*time.Millisecond.Nanoseconds()))
   407  	})
   408  
   409  	It("timeout", func() {
   410  		testPoolTimeout := 1 * time.Second
   411  		opt := &pool.Options{
   412  			Dialer: func(ctx context.Context) (net.Conn, error) {
   413  				// Artificial delay to force pool timeout
   414  				time.Sleep(3 * testPoolTimeout)
   415  
   416  				return &net.TCPConn{}, nil
   417  			},
   418  			PoolSize:    1,
   419  			PoolTimeout: testPoolTimeout,
   420  		}
   421  		p := pool.NewConnPool(opt)
   422  
   423  		stats := p.Stats()
   424  		Expect(stats.Timeouts).To(Equal(uint32(0)))
   425  
   426  		conn, err := p.Get(ctx)
   427  		Expect(err).NotTo(HaveOccurred())
   428  		_, err = p.Get(ctx)
   429  		Expect(err).To(MatchError(pool.ErrPoolTimeout))
   430  		p.Put(ctx, conn)
   431  		_, err = p.Get(ctx)
   432  		Expect(err).NotTo(HaveOccurred())
   433  
   434  		stats = p.Stats()
   435  		Expect(stats.Timeouts).To(Equal(uint32(1)))
   436  	})
   437  })
   438  

View as plain text