...

Source file src/github.com/redis/go-redis/v9/pipeline_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"errors"
     5  	"strconv"
     6  
     7  	. "github.com/bsm/ginkgo/v2"
     8  	. "github.com/bsm/gomega"
     9  
    10  	"github.com/redis/go-redis/v9"
    11  )
    12  
    13  var _ = Describe("pipelining", func() {
    14  	var client *redis.Client
    15  	var pipe *redis.Pipeline
    16  
    17  	BeforeEach(func() {
    18  		client = redis.NewClient(redisOptions())
    19  		Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
    20  	})
    21  
    22  	AfterEach(func() {
    23  		Expect(client.Close()).NotTo(HaveOccurred())
    24  	})
    25  
    26  	It("supports block style", func() {
    27  		var get *redis.StringCmd
    28  		cmds, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    29  			get = pipe.Get(ctx, "foo")
    30  			return nil
    31  		})
    32  		Expect(err).To(Equal(redis.Nil))
    33  		Expect(cmds).To(HaveLen(1))
    34  		Expect(cmds[0]).To(Equal(get))
    35  		Expect(get.Err()).To(Equal(redis.Nil))
    36  		Expect(get.Val()).To(Equal(""))
    37  	})
    38  
    39  	It("exports queued commands", func() {
    40  		p := client.Pipeline()
    41  		cmds := p.Cmds()
    42  		Expect(cmds).To(BeEmpty())
    43  
    44  		p.Set(ctx, "foo", "bar", 0)
    45  		p.Get(ctx, "foo")
    46  		cmds = p.Cmds()
    47  		Expect(cmds).To(HaveLen(p.Len()))
    48  		Expect(cmds[0].Name()).To(Equal("set"))
    49  		Expect(cmds[1].Name()).To(Equal("get"))
    50  
    51  		cmds, err := p.Exec(ctx)
    52  		Expect(err).NotTo(HaveOccurred())
    53  		Expect(cmds).To(HaveLen(2))
    54  		Expect(cmds[0].Name()).To(Equal("set"))
    55  		Expect(cmds[0].(*redis.StatusCmd).Val()).To(Equal("OK"))
    56  		Expect(cmds[1].Name()).To(Equal("get"))
    57  		Expect(cmds[1].(*redis.StringCmd).Val()).To(Equal("bar"))
    58  
    59  		cmds = p.Cmds()
    60  		Expect(cmds).To(BeEmpty())
    61  	})
    62  
    63  	assertPipeline := func() {
    64  		It("returns no errors when there are no commands", func() {
    65  			_, err := pipe.Exec(ctx)
    66  			Expect(err).NotTo(HaveOccurred())
    67  		})
    68  
    69  		It("discards queued commands", func() {
    70  			pipe.Get(ctx, "key")
    71  			pipe.Discard()
    72  			cmds, err := pipe.Exec(ctx)
    73  			Expect(err).NotTo(HaveOccurred())
    74  			Expect(cmds).To(BeNil())
    75  		})
    76  
    77  		It("handles val/err", func() {
    78  			err := client.Set(ctx, "key", "value", 0).Err()
    79  			Expect(err).NotTo(HaveOccurred())
    80  
    81  			get := pipe.Get(ctx, "key")
    82  			cmds, err := pipe.Exec(ctx)
    83  			Expect(err).NotTo(HaveOccurred())
    84  			Expect(cmds).To(HaveLen(1))
    85  
    86  			val, err := get.Result()
    87  			Expect(err).NotTo(HaveOccurred())
    88  			Expect(val).To(Equal("value"))
    89  		})
    90  
    91  		It("supports custom command", func() {
    92  			pipe.Do(ctx, "ping")
    93  			cmds, err := pipe.Exec(ctx)
    94  			Expect(err).NotTo(HaveOccurred())
    95  			Expect(cmds).To(HaveLen(1))
    96  		})
    97  
    98  		It("handles large pipelines", Label("NonRedisEnterprise"), func() {
    99  			for callCount := 1; callCount < 16; callCount++ {
   100  				for i := 1; i <= callCount; i++ {
   101  					pipe.SetNX(ctx, strconv.Itoa(i)+"_key", strconv.Itoa(i)+"_value", 0)
   102  				}
   103  
   104  				cmds, err := pipe.Exec(ctx)
   105  				Expect(err).NotTo(HaveOccurred())
   106  				Expect(cmds).To(HaveLen(callCount))
   107  				for _, cmd := range cmds {
   108  					Expect(cmd).To(BeAssignableToTypeOf(&redis.BoolCmd{}))
   109  				}
   110  			}
   111  		})
   112  
   113  		It("should Exec, not Do", func() {
   114  			err := pipe.Do(ctx).Err()
   115  			Expect(err).To(Equal(errors.New("redis: please enter the command to be executed")))
   116  		})
   117  
   118  		It("should process", func() {
   119  			err := pipe.Process(ctx, redis.NewCmd(ctx, "asking"))
   120  			Expect(err).To(BeNil())
   121  			Expect(pipe.Cmds()).To(HaveLen(1))
   122  		})
   123  
   124  		It("should batchProcess", func() {
   125  			err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"))
   126  			Expect(err).To(BeNil())
   127  			Expect(pipe.Cmds()).To(HaveLen(1))
   128  
   129  			pipe.Discard()
   130  			Expect(pipe.Cmds()).To(HaveLen(0))
   131  
   132  			err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value"))
   133  			Expect(err).To(BeNil())
   134  			Expect(pipe.Cmds()).To(HaveLen(2))
   135  		})
   136  	}
   137  
   138  	Describe("Pipeline", func() {
   139  		BeforeEach(func() {
   140  			pipe = client.Pipeline().(*redis.Pipeline)
   141  		})
   142  
   143  		assertPipeline()
   144  	})
   145  
   146  	Describe("TxPipeline", func() {
   147  		BeforeEach(func() {
   148  			pipe = client.TxPipeline().(*redis.Pipeline)
   149  		})
   150  
   151  		assertPipeline()
   152  	})
   153  })
   154  

View as plain text