1 package redis
2
3 import (
4 "context"
5 "encoding"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "reflect"
11 "runtime"
12 "strings"
13 "time"
14
15 "github.com/redis/go-redis/v9/internal"
16 )
17
18
19
20
21
22
23 const KeepTTL = -1
24
25 func usePrecise(dur time.Duration) bool {
26 return dur < time.Second || dur%time.Second != 0
27 }
28
29 func formatMs(ctx context.Context, dur time.Duration) int64 {
30 if dur > 0 && dur < time.Millisecond {
31 internal.Logger.Printf(
32 ctx,
33 "specified duration is %s, but minimal supported value is %s - truncating to 1ms",
34 dur, time.Millisecond,
35 )
36 return 1
37 }
38 return int64(dur / time.Millisecond)
39 }
40
41 func formatSec(ctx context.Context, dur time.Duration) int64 {
42 if dur > 0 && dur < time.Second {
43 internal.Logger.Printf(
44 ctx,
45 "specified duration is %s, but minimal supported value is %s - truncating to 1s",
46 dur, time.Second,
47 )
48 return 1
49 }
50 return int64(dur / time.Second)
51 }
52
53 func appendArgs(dst, src []interface{}) []interface{} {
54 if len(src) == 1 {
55 return appendArg(dst, src[0])
56 }
57
58 dst = append(dst, src...)
59 return dst
60 }
61
62 func appendArg(dst []interface{}, arg interface{}) []interface{} {
63 switch arg := arg.(type) {
64 case []string:
65 for _, s := range arg {
66 dst = append(dst, s)
67 }
68 return dst
69 case []interface{}:
70 dst = append(dst, arg...)
71 return dst
72 case map[string]interface{}:
73 for k, v := range arg {
74 dst = append(dst, k, v)
75 }
76 return dst
77 case map[string]string:
78 for k, v := range arg {
79 dst = append(dst, k, v)
80 }
81 return dst
82 case time.Time, time.Duration, encoding.BinaryMarshaler, net.IP:
83 return append(dst, arg)
84 case nil:
85 return dst
86 default:
87
88 v := reflect.ValueOf(arg)
89 if v.Type().Kind() == reflect.Ptr {
90 if v.IsNil() {
91
92 return dst
93 }
94 v = v.Elem()
95 }
96
97 if v.Type().Kind() == reflect.Struct {
98 return appendStructField(dst, v)
99 }
100
101 return append(dst, arg)
102 }
103 }
104
105
106 func appendStructField(dst []interface{}, v reflect.Value) []interface{} {
107 typ := v.Type()
108 for i := 0; i < typ.NumField(); i++ {
109 tag := typ.Field(i).Tag.Get("redis")
110 if tag == "" || tag == "-" {
111 continue
112 }
113 name, opt, _ := strings.Cut(tag, ",")
114 if name == "" {
115 continue
116 }
117
118 field := v.Field(i)
119
120
121 if omitEmpty(opt) && isEmptyValue(field) {
122 continue
123 }
124
125 if field.CanInterface() {
126 dst = append(dst, name, field.Interface())
127 }
128 }
129
130 return dst
131 }
132
133 func omitEmpty(opt string) bool {
134 for opt != "" {
135 var name string
136 name, opt, _ = strings.Cut(opt, ",")
137 if name == "omitempty" {
138 return true
139 }
140 }
141 return false
142 }
143
144 func isEmptyValue(v reflect.Value) bool {
145 switch v.Kind() {
146 case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
147 return v.Len() == 0
148 case reflect.Bool:
149 return !v.Bool()
150 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
151 return v.Int() == 0
152 case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
153 return v.Uint() == 0
154 case reflect.Float32, reflect.Float64:
155 return v.Float() == 0
156 case reflect.Interface, reflect.Pointer:
157 return v.IsNil()
158 case reflect.Struct:
159 if v.Type() == reflect.TypeOf(time.Time{}) {
160 return v.IsZero()
161 }
162
163
164 }
165 return false
166 }
167
168 type Cmdable interface {
169 Pipeline() Pipeliner
170 Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error)
171
172 TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error)
173 TxPipeline() Pipeliner
174
175 Command(ctx context.Context) *CommandsInfoCmd
176 CommandList(ctx context.Context, filter *FilterBy) *StringSliceCmd
177 CommandGetKeys(ctx context.Context, commands ...interface{}) *StringSliceCmd
178 CommandGetKeysAndFlags(ctx context.Context, commands ...interface{}) *KeyFlagsCmd
179 ClientGetName(ctx context.Context) *StringCmd
180 Echo(ctx context.Context, message interface{}) *StringCmd
181 Ping(ctx context.Context) *StatusCmd
182 Quit(ctx context.Context) *StatusCmd
183 Unlink(ctx context.Context, keys ...string) *IntCmd
184
185 BgRewriteAOF(ctx context.Context) *StatusCmd
186 BgSave(ctx context.Context) *StatusCmd
187 ClientKill(ctx context.Context, ipPort string) *StatusCmd
188 ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd
189 ClientList(ctx context.Context) *StringCmd
190 ClientInfo(ctx context.Context) *ClientInfoCmd
191 ClientPause(ctx context.Context, dur time.Duration) *BoolCmd
192 ClientUnpause(ctx context.Context) *BoolCmd
193 ClientID(ctx context.Context) *IntCmd
194 ClientUnblock(ctx context.Context, id int64) *IntCmd
195 ClientUnblockWithError(ctx context.Context, id int64) *IntCmd
196 ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd
197 ConfigResetStat(ctx context.Context) *StatusCmd
198 ConfigSet(ctx context.Context, parameter, value string) *StatusCmd
199 ConfigRewrite(ctx context.Context) *StatusCmd
200 DBSize(ctx context.Context) *IntCmd
201 FlushAll(ctx context.Context) *StatusCmd
202 FlushAllAsync(ctx context.Context) *StatusCmd
203 FlushDB(ctx context.Context) *StatusCmd
204 FlushDBAsync(ctx context.Context) *StatusCmd
205 Info(ctx context.Context, section ...string) *StringCmd
206 LastSave(ctx context.Context) *IntCmd
207 Save(ctx context.Context) *StatusCmd
208 Shutdown(ctx context.Context) *StatusCmd
209 ShutdownSave(ctx context.Context) *StatusCmd
210 ShutdownNoSave(ctx context.Context) *StatusCmd
211 SlaveOf(ctx context.Context, host, port string) *StatusCmd
212 SlowLogGet(ctx context.Context, num int64) *SlowLogCmd
213 Time(ctx context.Context) *TimeCmd
214 DebugObject(ctx context.Context, key string) *StringCmd
215 MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd
216
217 ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd
218
219 ACLCmdable
220 BitMapCmdable
221 ClusterCmdable
222 GenericCmdable
223 GeoCmdable
224 HashCmdable
225 HyperLogLogCmdable
226 ListCmdable
227 ProbabilisticCmdable
228 PubSubCmdable
229 ScriptingFunctionsCmdable
230 SearchCmdable
231 SetCmdable
232 SortedSetCmdable
233 StringCmdable
234 StreamCmdable
235 TimeseriesCmdable
236 JSONCmdable
237 VectorSetCmdable
238 }
239
240 type StatefulCmdable interface {
241 Cmdable
242 Auth(ctx context.Context, password string) *StatusCmd
243 AuthACL(ctx context.Context, username, password string) *StatusCmd
244 Select(ctx context.Context, index int) *StatusCmd
245 SwapDB(ctx context.Context, index1, index2 int) *StatusCmd
246 ClientSetName(ctx context.Context, name string) *BoolCmd
247 ClientSetInfo(ctx context.Context, info LibraryInfo) *StatusCmd
248 Hello(ctx context.Context, ver int, username, password, clientName string) *MapStringInterfaceCmd
249 }
250
251 var (
252 _ Cmdable = (*Client)(nil)
253 _ Cmdable = (*Tx)(nil)
254 _ Cmdable = (*Ring)(nil)
255 _ Cmdable = (*ClusterClient)(nil)
256 _ Cmdable = (*Pipeline)(nil)
257 )
258
259 type cmdable func(ctx context.Context, cmd Cmder) error
260
261 type statefulCmdable func(ctx context.Context, cmd Cmder) error
262
263
264
265 func (c statefulCmdable) Auth(ctx context.Context, password string) *StatusCmd {
266 cmd := NewStatusCmd(ctx, "auth", password)
267 _ = c(ctx, cmd)
268 return cmd
269 }
270
271
272
273
274 func (c statefulCmdable) AuthACL(ctx context.Context, username, password string) *StatusCmd {
275 cmd := NewStatusCmd(ctx, "auth", username, password)
276 _ = c(ctx, cmd)
277 return cmd
278 }
279
280 func (c cmdable) Wait(ctx context.Context, numSlaves int, timeout time.Duration) *IntCmd {
281 cmd := NewIntCmd(ctx, "wait", numSlaves, int(timeout/time.Millisecond))
282 cmd.setReadTimeout(timeout)
283 _ = c(ctx, cmd)
284 return cmd
285 }
286
287 func (c cmdable) WaitAOF(ctx context.Context, numLocal, numSlaves int, timeout time.Duration) *IntCmd {
288 cmd := NewIntCmd(ctx, "waitAOF", numLocal, numSlaves, int(timeout/time.Millisecond))
289 cmd.setReadTimeout(timeout)
290 _ = c(ctx, cmd)
291 return cmd
292 }
293
294 func (c statefulCmdable) Select(ctx context.Context, index int) *StatusCmd {
295 cmd := NewStatusCmd(ctx, "select", index)
296 _ = c(ctx, cmd)
297 return cmd
298 }
299
300 func (c statefulCmdable) SwapDB(ctx context.Context, index1, index2 int) *StatusCmd {
301 cmd := NewStatusCmd(ctx, "swapdb", index1, index2)
302 _ = c(ctx, cmd)
303 return cmd
304 }
305
306
307 func (c statefulCmdable) ClientSetName(ctx context.Context, name string) *BoolCmd {
308 cmd := NewBoolCmd(ctx, "client", "setname", name)
309 _ = c(ctx, cmd)
310 return cmd
311 }
312
313
314 func (c statefulCmdable) ClientSetInfo(ctx context.Context, info LibraryInfo) *StatusCmd {
315 err := info.Validate()
316 if err != nil {
317 panic(err.Error())
318 }
319
320 var cmd *StatusCmd
321 if info.LibName != nil {
322 libName := fmt.Sprintf("go-redis(%s,%s)", *info.LibName, internal.ReplaceSpaces(runtime.Version()))
323 cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-NAME", libName)
324 } else {
325 cmd = NewStatusCmd(ctx, "client", "setinfo", "LIB-VER", *info.LibVer)
326 }
327
328 _ = c(ctx, cmd)
329 return cmd
330 }
331
332
333 func (info LibraryInfo) Validate() error {
334 if info.LibName != nil && info.LibVer != nil {
335 return errors.New("both LibName and LibVer cannot be set at the same time")
336 }
337 if info.LibName == nil && info.LibVer == nil {
338 return errors.New("at least one of LibName and LibVer should be set")
339 }
340 return nil
341 }
342
343
344 func (c statefulCmdable) Hello(ctx context.Context,
345 ver int, username, password, clientName string,
346 ) *MapStringInterfaceCmd {
347 args := make([]interface{}, 0, 7)
348 args = append(args, "hello", ver)
349 if password != "" {
350 if username != "" {
351 args = append(args, "auth", username, password)
352 } else {
353 args = append(args, "auth", "default", password)
354 }
355 }
356 if clientName != "" {
357 args = append(args, "setname", clientName)
358 }
359 cmd := NewMapStringInterfaceCmd(ctx, args...)
360 _ = c(ctx, cmd)
361 return cmd
362 }
363
364
365
366 func (c cmdable) Command(ctx context.Context) *CommandsInfoCmd {
367 cmd := NewCommandsInfoCmd(ctx, "command")
368 _ = c(ctx, cmd)
369 return cmd
370 }
371
372
373 type FilterBy struct {
374 Module string
375 ACLCat string
376 Pattern string
377 }
378
379 func (c cmdable) CommandList(ctx context.Context, filter *FilterBy) *StringSliceCmd {
380 args := make([]interface{}, 0, 5)
381 args = append(args, "command", "list")
382 if filter != nil {
383 if filter.Module != "" {
384 args = append(args, "filterby", "module", filter.Module)
385 } else if filter.ACLCat != "" {
386 args = append(args, "filterby", "aclcat", filter.ACLCat)
387 } else if filter.Pattern != "" {
388 args = append(args, "filterby", "pattern", filter.Pattern)
389 }
390 }
391 cmd := NewStringSliceCmd(ctx, args...)
392 _ = c(ctx, cmd)
393 return cmd
394 }
395
396 func (c cmdable) CommandGetKeys(ctx context.Context, commands ...interface{}) *StringSliceCmd {
397 args := make([]interface{}, 2+len(commands))
398 args[0] = "command"
399 args[1] = "getkeys"
400 copy(args[2:], commands)
401 cmd := NewStringSliceCmd(ctx, args...)
402 _ = c(ctx, cmd)
403 return cmd
404 }
405
406 func (c cmdable) CommandGetKeysAndFlags(ctx context.Context, commands ...interface{}) *KeyFlagsCmd {
407 args := make([]interface{}, 2+len(commands))
408 args[0] = "command"
409 args[1] = "getkeysandflags"
410 copy(args[2:], commands)
411 cmd := NewKeyFlagsCmd(ctx, args...)
412 _ = c(ctx, cmd)
413 return cmd
414 }
415
416
417 func (c cmdable) ClientGetName(ctx context.Context) *StringCmd {
418 cmd := NewStringCmd(ctx, "client", "getname")
419 _ = c(ctx, cmd)
420 return cmd
421 }
422
423 func (c cmdable) Echo(ctx context.Context, message interface{}) *StringCmd {
424 cmd := NewStringCmd(ctx, "echo", message)
425 _ = c(ctx, cmd)
426 return cmd
427 }
428
429 func (c cmdable) Ping(ctx context.Context) *StatusCmd {
430 cmd := NewStatusCmd(ctx, "ping")
431 _ = c(ctx, cmd)
432 return cmd
433 }
434
435 func (c cmdable) Do(ctx context.Context, args ...interface{}) *Cmd {
436 cmd := NewCmd(ctx, args...)
437 _ = c(ctx, cmd)
438 return cmd
439 }
440
441 func (c cmdable) Quit(_ context.Context) *StatusCmd {
442 panic("not implemented")
443 }
444
445
446
447 func (c cmdable) BgRewriteAOF(ctx context.Context) *StatusCmd {
448 cmd := NewStatusCmd(ctx, "bgrewriteaof")
449 _ = c(ctx, cmd)
450 return cmd
451 }
452
453 func (c cmdable) BgSave(ctx context.Context) *StatusCmd {
454 cmd := NewStatusCmd(ctx, "bgsave")
455 _ = c(ctx, cmd)
456 return cmd
457 }
458
459 func (c cmdable) ClientKill(ctx context.Context, ipPort string) *StatusCmd {
460 cmd := NewStatusCmd(ctx, "client", "kill", ipPort)
461 _ = c(ctx, cmd)
462 return cmd
463 }
464
465
466
467
468 func (c cmdable) ClientKillByFilter(ctx context.Context, keys ...string) *IntCmd {
469 args := make([]interface{}, 2+len(keys))
470 args[0] = "client"
471 args[1] = "kill"
472 for i, key := range keys {
473 args[2+i] = key
474 }
475 cmd := NewIntCmd(ctx, args...)
476 _ = c(ctx, cmd)
477 return cmd
478 }
479
480 func (c cmdable) ClientList(ctx context.Context) *StringCmd {
481 cmd := NewStringCmd(ctx, "client", "list")
482 _ = c(ctx, cmd)
483 return cmd
484 }
485
486 func (c cmdable) ClientPause(ctx context.Context, dur time.Duration) *BoolCmd {
487 cmd := NewBoolCmd(ctx, "client", "pause", formatMs(ctx, dur))
488 _ = c(ctx, cmd)
489 return cmd
490 }
491
492 func (c cmdable) ClientUnpause(ctx context.Context) *BoolCmd {
493 cmd := NewBoolCmd(ctx, "client", "unpause")
494 _ = c(ctx, cmd)
495 return cmd
496 }
497
498 func (c cmdable) ClientID(ctx context.Context) *IntCmd {
499 cmd := NewIntCmd(ctx, "client", "id")
500 _ = c(ctx, cmd)
501 return cmd
502 }
503
504 func (c cmdable) ClientUnblock(ctx context.Context, id int64) *IntCmd {
505 cmd := NewIntCmd(ctx, "client", "unblock", id)
506 _ = c(ctx, cmd)
507 return cmd
508 }
509
510 func (c cmdable) ClientUnblockWithError(ctx context.Context, id int64) *IntCmd {
511 cmd := NewIntCmd(ctx, "client", "unblock", id, "error")
512 _ = c(ctx, cmd)
513 return cmd
514 }
515
516 func (c cmdable) ClientInfo(ctx context.Context) *ClientInfoCmd {
517 cmd := NewClientInfoCmd(ctx, "client", "info")
518 _ = c(ctx, cmd)
519 return cmd
520 }
521
522
523
524 func (c cmdable) ConfigGet(ctx context.Context, parameter string) *MapStringStringCmd {
525 cmd := NewMapStringStringCmd(ctx, "config", "get", parameter)
526 _ = c(ctx, cmd)
527 return cmd
528 }
529
530 func (c cmdable) ConfigResetStat(ctx context.Context) *StatusCmd {
531 cmd := NewStatusCmd(ctx, "config", "resetstat")
532 _ = c(ctx, cmd)
533 return cmd
534 }
535
536 func (c cmdable) ConfigSet(ctx context.Context, parameter, value string) *StatusCmd {
537 cmd := NewStatusCmd(ctx, "config", "set", parameter, value)
538 _ = c(ctx, cmd)
539 return cmd
540 }
541
542 func (c cmdable) ConfigRewrite(ctx context.Context) *StatusCmd {
543 cmd := NewStatusCmd(ctx, "config", "rewrite")
544 _ = c(ctx, cmd)
545 return cmd
546 }
547
548 func (c cmdable) DBSize(ctx context.Context) *IntCmd {
549 cmd := NewIntCmd(ctx, "dbsize")
550 _ = c(ctx, cmd)
551 return cmd
552 }
553
554 func (c cmdable) FlushAll(ctx context.Context) *StatusCmd {
555 cmd := NewStatusCmd(ctx, "flushall")
556 _ = c(ctx, cmd)
557 return cmd
558 }
559
560 func (c cmdable) FlushAllAsync(ctx context.Context) *StatusCmd {
561 cmd := NewStatusCmd(ctx, "flushall", "async")
562 _ = c(ctx, cmd)
563 return cmd
564 }
565
566 func (c cmdable) FlushDB(ctx context.Context) *StatusCmd {
567 cmd := NewStatusCmd(ctx, "flushdb")
568 _ = c(ctx, cmd)
569 return cmd
570 }
571
572 func (c cmdable) FlushDBAsync(ctx context.Context) *StatusCmd {
573 cmd := NewStatusCmd(ctx, "flushdb", "async")
574 _ = c(ctx, cmd)
575 return cmd
576 }
577
578 func (c cmdable) Info(ctx context.Context, sections ...string) *StringCmd {
579 args := make([]interface{}, 1+len(sections))
580 args[0] = "info"
581 for i, section := range sections {
582 args[i+1] = section
583 }
584 cmd := NewStringCmd(ctx, args...)
585 _ = c(ctx, cmd)
586 return cmd
587 }
588
589 func (c cmdable) InfoMap(ctx context.Context, sections ...string) *InfoCmd {
590 args := make([]interface{}, 1+len(sections))
591 args[0] = "info"
592 for i, section := range sections {
593 args[i+1] = section
594 }
595 cmd := NewInfoCmd(ctx, args...)
596 _ = c(ctx, cmd)
597 return cmd
598 }
599
600 func (c cmdable) LastSave(ctx context.Context) *IntCmd {
601 cmd := NewIntCmd(ctx, "lastsave")
602 _ = c(ctx, cmd)
603 return cmd
604 }
605
606 func (c cmdable) Save(ctx context.Context) *StatusCmd {
607 cmd := NewStatusCmd(ctx, "save")
608 _ = c(ctx, cmd)
609 return cmd
610 }
611
612 func (c cmdable) shutdown(ctx context.Context, modifier string) *StatusCmd {
613 var args []interface{}
614 if modifier == "" {
615 args = []interface{}{"shutdown"}
616 } else {
617 args = []interface{}{"shutdown", modifier}
618 }
619 cmd := NewStatusCmd(ctx, args...)
620 _ = c(ctx, cmd)
621 if err := cmd.Err(); err != nil {
622 if err == io.EOF {
623
624 cmd.err = nil
625 }
626 } else {
627
628 cmd.err = errors.New(cmd.val)
629 cmd.val = ""
630 }
631 return cmd
632 }
633
634 func (c cmdable) Shutdown(ctx context.Context) *StatusCmd {
635 return c.shutdown(ctx, "")
636 }
637
638 func (c cmdable) ShutdownSave(ctx context.Context) *StatusCmd {
639 return c.shutdown(ctx, "save")
640 }
641
642 func (c cmdable) ShutdownNoSave(ctx context.Context) *StatusCmd {
643 return c.shutdown(ctx, "nosave")
644 }
645
646 func (c cmdable) SlaveOf(ctx context.Context, host, port string) *StatusCmd {
647 cmd := NewStatusCmd(ctx, "slaveof", host, port)
648 _ = c(ctx, cmd)
649 return cmd
650 }
651
652 func (c cmdable) SlowLogGet(ctx context.Context, num int64) *SlowLogCmd {
653 cmd := NewSlowLogCmd(context.Background(), "slowlog", "get", num)
654 _ = c(ctx, cmd)
655 return cmd
656 }
657
658 func (c cmdable) Sync(_ context.Context) {
659 panic("not implemented")
660 }
661
662 func (c cmdable) Time(ctx context.Context) *TimeCmd {
663 cmd := NewTimeCmd(ctx, "time")
664 _ = c(ctx, cmd)
665 return cmd
666 }
667
668 func (c cmdable) DebugObject(ctx context.Context, key string) *StringCmd {
669 cmd := NewStringCmd(ctx, "debug", "object", key)
670 _ = c(ctx, cmd)
671 return cmd
672 }
673
674 func (c cmdable) MemoryUsage(ctx context.Context, key string, samples ...int) *IntCmd {
675 args := []interface{}{"memory", "usage", key}
676 if len(samples) > 0 {
677 if len(samples) != 1 {
678 panic("MemoryUsage expects single sample count")
679 }
680 args = append(args, "SAMPLES", samples[0])
681 }
682 cmd := NewIntCmd(ctx, args...)
683 cmd.SetFirstKeyPos(2)
684 _ = c(ctx, cmd)
685 return cmd
686 }
687
688
689
690
691
692 type ModuleLoadexConfig struct {
693 Path string
694 Conf map[string]interface{}
695 Args []interface{}
696 }
697
698 func (c *ModuleLoadexConfig) toArgs() []interface{} {
699 args := make([]interface{}, 3, 3+len(c.Conf)*3+len(c.Args)*2)
700 args[0] = "MODULE"
701 args[1] = "LOADEX"
702 args[2] = c.Path
703 for k, v := range c.Conf {
704 args = append(args, "CONFIG", k, v)
705 }
706 for _, arg := range c.Args {
707 args = append(args, "ARGS", arg)
708 }
709 return args
710 }
711
712
713 func (c cmdable) ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd {
714 cmd := NewStringCmd(ctx, conf.toArgs()...)
715 _ = c(ctx, cmd)
716 return cmd
717 }
718
719
730 func (c cmdable) Monitor(ctx context.Context, ch chan string) *MonitorCmd {
731 cmd := newMonitorCmd(ctx, ch)
732 _ = c(ctx, cmd)
733 return cmd
734 }
735
View as plain text