1 package redis
2
3 import (
4 "context"
5 "time"
6 )
7
8 type StreamCmdable interface {
9 XAdd(ctx context.Context, a *XAddArgs) *StringCmd
10 XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd
11 XDel(ctx context.Context, stream string, ids ...string) *IntCmd
12 XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd
13 XLen(ctx context.Context, stream string) *IntCmd
14 XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
15 XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
16 XRevRange(ctx context.Context, stream string, start, stop string) *XMessageSliceCmd
17 XRevRangeN(ctx context.Context, stream string, start, stop string, count int64) *XMessageSliceCmd
18 XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd
19 XReadStreams(ctx context.Context, streams ...string) *XStreamSliceCmd
20 XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd
21 XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
22 XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
23 XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
24 XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
25 XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
26 XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
27 XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
28 XPending(ctx context.Context, stream, group string) *XPendingCmd
29 XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
30 XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
31 XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
32 XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd
33 XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
34 XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
35 XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
36 XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
37 XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
38 XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
39 XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
40 XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
41 XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
42 XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
43 XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
44 XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
45 XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
46 }
47
48
49
50
51
52
53
54
55 type XAddArgs struct {
56 Stream string
57 NoMkStream bool
58 MaxLen int64
59 MinID string
60
61 Approx bool
62 Limit int64
63 Mode string
64 ID string
65 Values interface{}
66 }
67
68 func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
69 args := make([]interface{}, 0, 11)
70 args = append(args, "xadd", a.Stream)
71 if a.NoMkStream {
72 args = append(args, "nomkstream")
73 }
74 switch {
75 case a.MaxLen > 0:
76 if a.Approx {
77 args = append(args, "maxlen", "~", a.MaxLen)
78 } else {
79 args = append(args, "maxlen", a.MaxLen)
80 }
81 case a.MinID != "":
82 if a.Approx {
83 args = append(args, "minid", "~", a.MinID)
84 } else {
85 args = append(args, "minid", a.MinID)
86 }
87 }
88 if a.Limit > 0 {
89 args = append(args, "limit", a.Limit)
90 }
91
92 if a.Mode != "" {
93 args = append(args, a.Mode)
94 }
95
96 if a.ID != "" {
97 args = append(args, a.ID)
98 } else {
99 args = append(args, "*")
100 }
101 args = appendArg(args, a.Values)
102
103 cmd := NewStringCmd(ctx, args...)
104 _ = c(ctx, cmd)
105 return cmd
106 }
107
108 func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd {
109 args := []interface{}{"xackdel", stream, group, mode, "ids", len(ids)}
110 for _, id := range ids {
111 args = append(args, id)
112 }
113 cmd := NewSliceCmd(ctx, args...)
114 _ = c(ctx, cmd)
115 return cmd
116 }
117
118 func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
119 args := []interface{}{"xdel", stream}
120 for _, id := range ids {
121 args = append(args, id)
122 }
123 cmd := NewIntCmd(ctx, args...)
124 _ = c(ctx, cmd)
125 return cmd
126 }
127
128 func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd {
129 args := []interface{}{"xdelex", stream, mode, "ids", len(ids)}
130 for _, id := range ids {
131 args = append(args, id)
132 }
133 cmd := NewSliceCmd(ctx, args...)
134 _ = c(ctx, cmd)
135 return cmd
136 }
137
138 func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
139 cmd := NewIntCmd(ctx, "xlen", stream)
140 _ = c(ctx, cmd)
141 return cmd
142 }
143
144 func (c cmdable) XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd {
145 cmd := NewXMessageSliceCmd(ctx, "xrange", stream, start, stop)
146 _ = c(ctx, cmd)
147 return cmd
148 }
149
150 func (c cmdable) XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd {
151 cmd := NewXMessageSliceCmd(ctx, "xrange", stream, start, stop, "count", count)
152 _ = c(ctx, cmd)
153 return cmd
154 }
155
156 func (c cmdable) XRevRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd {
157 cmd := NewXMessageSliceCmd(ctx, "xrevrange", stream, start, stop)
158 _ = c(ctx, cmd)
159 return cmd
160 }
161
162 func (c cmdable) XRevRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd {
163 cmd := NewXMessageSliceCmd(ctx, "xrevrange", stream, start, stop, "count", count)
164 _ = c(ctx, cmd)
165 return cmd
166 }
167
168 type XReadArgs struct {
169 Streams []string
170 Count int64
171 Block time.Duration
172 ID string
173 }
174
175 func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
176 args := make([]interface{}, 0, 2*len(a.Streams)+6)
177 args = append(args, "xread")
178
179 keyPos := int8(1)
180 if a.Count > 0 {
181 args = append(args, "count")
182 args = append(args, a.Count)
183 keyPos += 2
184 }
185 if a.Block >= 0 {
186 args = append(args, "block")
187 args = append(args, int64(a.Block/time.Millisecond))
188 keyPos += 2
189 }
190 args = append(args, "streams")
191 keyPos++
192 for _, s := range a.Streams {
193 args = append(args, s)
194 }
195 if a.ID != "" {
196 for range a.Streams {
197 args = append(args, a.ID)
198 }
199 }
200
201 cmd := NewXStreamSliceCmd(ctx, args...)
202 if a.Block >= 0 {
203 cmd.setReadTimeout(a.Block)
204 }
205 cmd.SetFirstKeyPos(keyPos)
206 _ = c(ctx, cmd)
207 return cmd
208 }
209
210 func (c cmdable) XReadStreams(ctx context.Context, streams ...string) *XStreamSliceCmd {
211 return c.XRead(ctx, &XReadArgs{
212 Streams: streams,
213 Block: -1,
214 })
215 }
216
217 func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd {
218 cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start)
219 cmd.SetFirstKeyPos(2)
220 _ = c(ctx, cmd)
221 return cmd
222 }
223
224 func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd {
225 cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream")
226 cmd.SetFirstKeyPos(2)
227 _ = c(ctx, cmd)
228 return cmd
229 }
230
231 func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd {
232 cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start)
233 cmd.SetFirstKeyPos(2)
234 _ = c(ctx, cmd)
235 return cmd
236 }
237
238 func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd {
239 cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group)
240 cmd.SetFirstKeyPos(2)
241 _ = c(ctx, cmd)
242 return cmd
243 }
244
245 func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
246 cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
247 cmd.SetFirstKeyPos(2)
248 _ = c(ctx, cmd)
249 return cmd
250 }
251
252 func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
253 cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
254 cmd.SetFirstKeyPos(2)
255 _ = c(ctx, cmd)
256 return cmd
257 }
258
259 type XReadGroupArgs struct {
260 Group string
261 Consumer string
262 Streams []string
263 Count int64
264 Block time.Duration
265 NoAck bool
266 }
267
268 func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
269 args := make([]interface{}, 0, 10+len(a.Streams))
270 args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
271
272 keyPos := int8(4)
273 if a.Count > 0 {
274 args = append(args, "count", a.Count)
275 keyPos += 2
276 }
277 if a.Block >= 0 {
278 args = append(args, "block", int64(a.Block/time.Millisecond))
279 keyPos += 2
280 }
281 if a.NoAck {
282 args = append(args, "noack")
283 keyPos++
284 }
285 args = append(args, "streams")
286 keyPos++
287 for _, s := range a.Streams {
288 args = append(args, s)
289 }
290
291 cmd := NewXStreamSliceCmd(ctx, args...)
292 if a.Block >= 0 {
293 cmd.setReadTimeout(a.Block)
294 }
295 cmd.SetFirstKeyPos(keyPos)
296 _ = c(ctx, cmd)
297 return cmd
298 }
299
300 func (c cmdable) XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd {
301 args := []interface{}{"xack", stream, group}
302 for _, id := range ids {
303 args = append(args, id)
304 }
305 cmd := NewIntCmd(ctx, args...)
306 _ = c(ctx, cmd)
307 return cmd
308 }
309
310 func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCmd {
311 cmd := NewXPendingCmd(ctx, "xpending", stream, group)
312 _ = c(ctx, cmd)
313 return cmd
314 }
315
316 type XPendingExtArgs struct {
317 Stream string
318 Group string
319 Idle time.Duration
320 Start string
321 End string
322 Count int64
323 Consumer string
324 }
325
326 func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd {
327 args := make([]interface{}, 0, 9)
328 args = append(args, "xpending", a.Stream, a.Group)
329 if a.Idle != 0 {
330 args = append(args, "idle", formatMs(ctx, a.Idle))
331 }
332 args = append(args, a.Start, a.End, a.Count)
333 if a.Consumer != "" {
334 args = append(args, a.Consumer)
335 }
336 cmd := NewXPendingExtCmd(ctx, args...)
337 _ = c(ctx, cmd)
338 return cmd
339 }
340
341 type XAutoClaimArgs struct {
342 Stream string
343 Group string
344 MinIdle time.Duration
345 Start string
346 Count int64
347 Consumer string
348 }
349
350 func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd {
351 args := xAutoClaimArgs(ctx, a)
352 cmd := NewXAutoClaimCmd(ctx, args...)
353 _ = c(ctx, cmd)
354 return cmd
355 }
356
357 func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
358 args := xAutoClaimArgs(ctx, a)
359 args = append(args, "justid")
360 cmd := NewXAutoClaimJustIDCmd(ctx, args...)
361 _ = c(ctx, cmd)
362 return cmd
363 }
364
365 func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} {
366 args := make([]interface{}, 0, 8)
367 args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start)
368 if a.Count > 0 {
369 args = append(args, "count", a.Count)
370 }
371 return args
372 }
373
374 type XClaimArgs struct {
375 Stream string
376 Group string
377 Consumer string
378 MinIdle time.Duration
379 Messages []string
380 }
381
382 func (c cmdable) XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd {
383 args := xClaimArgs(a)
384 cmd := NewXMessageSliceCmd(ctx, args...)
385 _ = c(ctx, cmd)
386 return cmd
387 }
388
389 func (c cmdable) XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd {
390 args := xClaimArgs(a)
391 args = append(args, "justid")
392 cmd := NewStringSliceCmd(ctx, args...)
393 _ = c(ctx, cmd)
394 return cmd
395 }
396
397 func xClaimArgs(a *XClaimArgs) []interface{} {
398 args := make([]interface{}, 0, 5+len(a.Messages))
399 args = append(args,
400 "xclaim",
401 a.Stream,
402 a.Group, a.Consumer,
403 int64(a.MinIdle/time.Millisecond))
404 for _, id := range a.Messages {
405 args = append(args, id)
406 }
407 return args
408 }
409
410
411
412
413
414
415
416
417
418
419 func (c cmdable) xTrim(
420 ctx context.Context, key, strategy string,
421 approx bool, threshold interface{}, limit int64,
422 ) *IntCmd {
423 args := make([]interface{}, 0, 7)
424 args = append(args, "xtrim", key, strategy)
425 if approx {
426 args = append(args, "~")
427 }
428 args = append(args, threshold)
429 if limit > 0 {
430 args = append(args, "limit", limit)
431 }
432 cmd := NewIntCmd(ctx, args...)
433 _ = c(ctx, cmd)
434 return cmd
435 }
436
437
438
439 func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
440 return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
441 }
442
443 func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
444 return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
445 }
446
447 func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
448 return c.xTrim(ctx, key, "minid", false, minID, 0)
449 }
450
451 func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
452 return c.xTrim(ctx, key, "minid", true, minID, limit)
453 }
454
455 func (c cmdable) xTrimMode(
456 ctx context.Context, key, strategy string,
457 approx bool, threshold interface{}, limit int64,
458 mode string,
459 ) *IntCmd {
460 args := make([]interface{}, 0, 7)
461 args = append(args, "xtrim", key, strategy)
462 if approx {
463 args = append(args, "~")
464 }
465 args = append(args, threshold)
466 if limit > 0 {
467 args = append(args, "limit", limit)
468 }
469 args = append(args, mode)
470 cmd := NewIntCmd(ctx, args...)
471 _ = c(ctx, cmd)
472 return cmd
473 }
474
475 func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
476 return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
477 }
478
479 func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
480 return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
481 }
482
483 func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
484 return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
485 }
486
487 func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
488 return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
489 }
490
491 func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
492 cmd := NewXInfoConsumersCmd(ctx, key, group)
493 _ = c(ctx, cmd)
494 return cmd
495 }
496
497 func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd {
498 cmd := NewXInfoGroupsCmd(ctx, key)
499 _ = c(ctx, cmd)
500 return cmd
501 }
502
503 func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd {
504 cmd := NewXInfoStreamCmd(ctx, key)
505 _ = c(ctx, cmd)
506 return cmd
507 }
508
509
510
511 func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd {
512 args := make([]interface{}, 0, 6)
513 args = append(args, "xinfo", "stream", key, "full")
514 if count > 0 {
515 args = append(args, "count", count)
516 }
517 cmd := NewXInfoStreamFullCmd(ctx, args...)
518 _ = c(ctx, cmd)
519 return cmd
520 }
521
View as plain text