...
1 package redis
2
3 import (
4 "context"
5 "errors"
6 )
7
8 type pipelineExecer func(context.Context, []Cmder) error
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 type Pipeliner interface {
24 StatefulCmdable
25
26
27 Len() int
28
29
30
31 Do(ctx context.Context, args ...interface{}) *Cmd
32
33
34 Process(ctx context.Context, cmd Cmder) error
35
36
37 BatchProcess(ctx context.Context, cmd ...Cmder) error
38
39
40 Discard()
41
42
43 Exec(ctx context.Context) ([]Cmder, error)
44
45
46 Cmds() []Cmder
47 }
48
49 var _ Pipeliner = (*Pipeline)(nil)
50
51
52
53
54 type Pipeline struct {
55 cmdable
56 statefulCmdable
57
58 exec pipelineExecer
59 cmds []Cmder
60 }
61
62 func (c *Pipeline) init() {
63 c.cmdable = c.Process
64 c.statefulCmdable = c.Process
65 }
66
67
68 func (c *Pipeline) Len() int {
69 return len(c.cmds)
70 }
71
72
73 func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
74 cmd := NewCmd(ctx, args...)
75 if len(args) == 0 {
76 cmd.SetErr(errors.New("redis: please enter the command to be executed"))
77 return cmd
78 }
79 _ = c.Process(ctx, cmd)
80 return cmd
81 }
82
83
84 func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
85 return c.BatchProcess(ctx, cmd)
86 }
87
88
89 func (c *Pipeline) BatchProcess(ctx context.Context, cmd ...Cmder) error {
90 c.cmds = append(c.cmds, cmd...)
91 return nil
92 }
93
94
95 func (c *Pipeline) Discard() {
96 c.cmds = c.cmds[:0]
97 }
98
99
100
101
102
103
104 func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
105 if len(c.cmds) == 0 {
106 return nil, nil
107 }
108
109 cmds := c.cmds
110 c.cmds = nil
111
112 return cmds, c.exec(ctx, cmds)
113 }
114
115 func (c *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
116 if err := fn(c); err != nil {
117 return nil, err
118 }
119 return c.Exec(ctx)
120 }
121
122 func (c *Pipeline) Pipeline() Pipeliner {
123 return c
124 }
125
126 func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
127 return c.Pipelined(ctx, fn)
128 }
129
130 func (c *Pipeline) TxPipeline() Pipeliner {
131 return c
132 }
133
134 func (c *Pipeline) Cmds() []Cmder {
135 return c.cmds
136 }
137
View as plain text