1 package redis
2
3 import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "math"
8 "net"
9 "net/url"
10 "runtime"
11 "sort"
12 "strings"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "github.com/redis/go-redis/v9/auth"
18 "github.com/redis/go-redis/v9/internal"
19 "github.com/redis/go-redis/v9/internal/hashtag"
20 "github.com/redis/go-redis/v9/internal/pool"
21 "github.com/redis/go-redis/v9/internal/proto"
22 "github.com/redis/go-redis/v9/internal/rand"
23 )
24
25 const (
26 minLatencyMeasurementInterval = 10 * time.Second
27 )
28
29 var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
30
31
32
33 type ClusterOptions struct {
34
35 Addrs []string
36
37
38 ClientName string
39
40
41 NewClient func(opt *Options) *Client
42
43
44
45
46 MaxRedirects int
47
48
49 ReadOnly bool
50
51
52 RouteByLatency bool
53
54
55 RouteRandomly bool
56
57
58
59
60
61
62 ClusterSlots func(context.Context) ([]ClusterSlot, error)
63
64
65
66 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
67
68 OnConnect func(ctx context.Context, cn *Conn) error
69
70 Protocol int
71 Username string
72 Password string
73 CredentialsProvider func() (username string, password string)
74 CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
75 StreamingCredentialsProvider auth.StreamingCredentialsProvider
76
77 MaxRetries int
78 MinRetryBackoff time.Duration
79 MaxRetryBackoff time.Duration
80
81 DialTimeout time.Duration
82 ReadTimeout time.Duration
83 WriteTimeout time.Duration
84 ContextTimeoutEnabled bool
85
86 PoolFIFO bool
87 PoolSize int
88 PoolTimeout time.Duration
89 MinIdleConns int
90 MaxIdleConns int
91 MaxActiveConns int
92 ConnMaxIdleTime time.Duration
93 ConnMaxLifetime time.Duration
94
95
96
97
98
99
100 ReadBufferSize int
101
102
103
104
105
106
107 WriteBufferSize int
108
109 TLSConfig *tls.Config
110
111
112
113
114
115
116 DisableIndentity bool
117
118
119
120
121 DisableIdentity bool
122
123 IdentitySuffix string
124
125
126 UnstableResp3 bool
127
128
129
130
131 FailingTimeoutSeconds int
132 }
133
134 func (opt *ClusterOptions) init() {
135 switch opt.MaxRedirects {
136 case -1:
137 opt.MaxRedirects = 0
138 case 0:
139 opt.MaxRedirects = 3
140 }
141
142 if opt.RouteByLatency || opt.RouteRandomly {
143 opt.ReadOnly = true
144 }
145
146 if opt.PoolSize == 0 {
147 opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
148 }
149 if opt.ReadBufferSize == 0 {
150 opt.ReadBufferSize = proto.DefaultBufferSize
151 }
152 if opt.WriteBufferSize == 0 {
153 opt.WriteBufferSize = proto.DefaultBufferSize
154 }
155
156 switch opt.ReadTimeout {
157 case -1:
158 opt.ReadTimeout = 0
159 case 0:
160 opt.ReadTimeout = 3 * time.Second
161 }
162 switch opt.WriteTimeout {
163 case -1:
164 opt.WriteTimeout = 0
165 case 0:
166 opt.WriteTimeout = opt.ReadTimeout
167 }
168
169 if opt.MaxRetries == 0 {
170 opt.MaxRetries = -1
171 }
172 switch opt.MinRetryBackoff {
173 case -1:
174 opt.MinRetryBackoff = 0
175 case 0:
176 opt.MinRetryBackoff = 8 * time.Millisecond
177 }
178 switch opt.MaxRetryBackoff {
179 case -1:
180 opt.MaxRetryBackoff = 0
181 case 0:
182 opt.MaxRetryBackoff = 512 * time.Millisecond
183 }
184
185 if opt.NewClient == nil {
186 opt.NewClient = NewClient
187 }
188
189 if opt.FailingTimeoutSeconds == 0 {
190 opt.FailingTimeoutSeconds = 15
191 }
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 func ParseClusterURL(redisURL string) (*ClusterOptions, error) {
230 o := &ClusterOptions{}
231
232 u, err := url.Parse(redisURL)
233 if err != nil {
234 return nil, err
235 }
236
237
238
239 h, p := getHostPortWithDefaults(u)
240 o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
241
242
243 o, err = setupClusterConn(u, h, o)
244 if err != nil {
245 return nil, err
246 }
247
248 return o, nil
249 }
250
251
252 func setupClusterConn(u *url.URL, host string, o *ClusterOptions) (*ClusterOptions, error) {
253 switch u.Scheme {
254 case "rediss":
255 o.TLSConfig = &tls.Config{ServerName: host}
256 fallthrough
257 case "redis":
258 o.Username, o.Password = getUserPassword(u)
259 default:
260 return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
261 }
262
263
264 o, err := setupClusterQueryParams(u, o)
265 if err != nil {
266 return nil, err
267 }
268
269 return o, nil
270 }
271
272
273 func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, error) {
274 q := queryOptions{q: u.Query()}
275
276 o.Protocol = q.int("protocol")
277 o.ClientName = q.string("client_name")
278 o.MaxRedirects = q.int("max_redirects")
279 o.ReadOnly = q.bool("read_only")
280 o.RouteByLatency = q.bool("route_by_latency")
281 o.RouteRandomly = q.bool("route_randomly")
282 o.MaxRetries = q.int("max_retries")
283 o.MinRetryBackoff = q.duration("min_retry_backoff")
284 o.MaxRetryBackoff = q.duration("max_retry_backoff")
285 o.DialTimeout = q.duration("dial_timeout")
286 o.ReadTimeout = q.duration("read_timeout")
287 o.WriteTimeout = q.duration("write_timeout")
288 o.PoolFIFO = q.bool("pool_fifo")
289 o.PoolSize = q.int("pool_size")
290 o.MinIdleConns = q.int("min_idle_conns")
291 o.MaxIdleConns = q.int("max_idle_conns")
292 o.MaxActiveConns = q.int("max_active_conns")
293 o.PoolTimeout = q.duration("pool_timeout")
294 o.ConnMaxLifetime = q.duration("conn_max_lifetime")
295 o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
296 o.FailingTimeoutSeconds = q.int("failing_timeout_seconds")
297
298 if q.err != nil {
299 return nil, q.err
300 }
301
302
303 addrs := q.strings("addr")
304 for _, addr := range addrs {
305 h, p, err := net.SplitHostPort(addr)
306 if err != nil || h == "" || p == "" {
307 return nil, fmt.Errorf("redis: unable to parse addr param: %s", addr)
308 }
309
310 o.Addrs = append(o.Addrs, net.JoinHostPort(h, p))
311 }
312
313
314 if r := q.remaining(); len(r) > 0 {
315 return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
316 }
317
318 return o, nil
319 }
320
321 func (opt *ClusterOptions) clientOptions() *Options {
322 return &Options{
323 ClientName: opt.ClientName,
324 Dialer: opt.Dialer,
325 OnConnect: opt.OnConnect,
326
327 Protocol: opt.Protocol,
328 Username: opt.Username,
329 Password: opt.Password,
330 CredentialsProvider: opt.CredentialsProvider,
331 CredentialsProviderContext: opt.CredentialsProviderContext,
332 StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
333
334 MaxRetries: opt.MaxRetries,
335 MinRetryBackoff: opt.MinRetryBackoff,
336 MaxRetryBackoff: opt.MaxRetryBackoff,
337
338 DialTimeout: opt.DialTimeout,
339 ReadTimeout: opt.ReadTimeout,
340 WriteTimeout: opt.WriteTimeout,
341 ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
342
343 PoolFIFO: opt.PoolFIFO,
344 PoolSize: opt.PoolSize,
345 PoolTimeout: opt.PoolTimeout,
346 MinIdleConns: opt.MinIdleConns,
347 MaxIdleConns: opt.MaxIdleConns,
348 MaxActiveConns: opt.MaxActiveConns,
349 ConnMaxIdleTime: opt.ConnMaxIdleTime,
350 ConnMaxLifetime: opt.ConnMaxLifetime,
351 ReadBufferSize: opt.ReadBufferSize,
352 WriteBufferSize: opt.WriteBufferSize,
353 DisableIdentity: opt.DisableIdentity,
354 DisableIndentity: opt.DisableIdentity,
355 IdentitySuffix: opt.IdentitySuffix,
356 FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
357 TLSConfig: opt.TLSConfig,
358
359
360
361
362
363 readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
364 UnstableResp3: opt.UnstableResp3,
365 }
366 }
367
368
369
370 type clusterNode struct {
371 Client *Client
372
373 latency uint32
374 generation uint32
375 failing uint32
376 loaded uint32
377
378
379 lastLatencyMeasurement int64
380 }
381
382 func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
383 opt := clOpt.clientOptions()
384 opt.Addr = addr
385 node := clusterNode{
386 Client: clOpt.NewClient(opt),
387 }
388
389 node.latency = math.MaxUint32
390 if clOpt.RouteByLatency {
391 go node.updateLatency()
392 }
393
394 return &node
395 }
396
397 func (n *clusterNode) String() string {
398 return n.Client.String()
399 }
400
401 func (n *clusterNode) Close() error {
402 return n.Client.Close()
403 }
404
405 const maximumNodeLatency = 1 * time.Minute
406
407 func (n *clusterNode) updateLatency() {
408 const numProbe = 10
409 var dur uint64
410
411 successes := 0
412 for i := 0; i < numProbe; i++ {
413 time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
414
415 start := time.Now()
416 err := n.Client.Ping(context.TODO()).Err()
417 if err == nil {
418 dur += uint64(time.Since(start) / time.Microsecond)
419 successes++
420 }
421 }
422
423 var latency float64
424 if successes == 0 {
425
426
427 latency = float64((maximumNodeLatency) / time.Microsecond)
428 } else {
429 latency = float64(dur) / float64(successes)
430 }
431 atomic.StoreUint32(&n.latency, uint32(latency+0.5))
432 n.SetLastLatencyMeasurement(time.Now())
433 }
434
435 func (n *clusterNode) Latency() time.Duration {
436 latency := atomic.LoadUint32(&n.latency)
437 return time.Duration(latency) * time.Microsecond
438 }
439
440 func (n *clusterNode) MarkAsFailing() {
441 atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
442 atomic.StoreUint32(&n.loaded, 0)
443 }
444
445 func (n *clusterNode) Failing() bool {
446 timeout := int64(n.Client.opt.FailingTimeoutSeconds)
447
448 failing := atomic.LoadUint32(&n.failing)
449 if failing == 0 {
450 return false
451 }
452 if time.Now().Unix()-int64(failing) < timeout {
453 return true
454 }
455 atomic.StoreUint32(&n.failing, 0)
456 return false
457 }
458
459 func (n *clusterNode) Generation() uint32 {
460 return atomic.LoadUint32(&n.generation)
461 }
462
463 func (n *clusterNode) LastLatencyMeasurement() int64 {
464 return atomic.LoadInt64(&n.lastLatencyMeasurement)
465 }
466
467 func (n *clusterNode) SetGeneration(gen uint32) {
468 for {
469 v := atomic.LoadUint32(&n.generation)
470 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
471 break
472 }
473 }
474 }
475
476 func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
477 for {
478 v := atomic.LoadInt64(&n.lastLatencyMeasurement)
479 if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
480 break
481 }
482 }
483 }
484
485 func (n *clusterNode) Loading() bool {
486 loaded := atomic.LoadUint32(&n.loaded)
487 if loaded == 1 {
488 return false
489 }
490
491
492 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
493 defer cancel()
494
495 err := n.Client.Ping(ctx).Err()
496 loading := err != nil && isLoadingError(err)
497 if !loading {
498 atomic.StoreUint32(&n.loaded, 1)
499 }
500 return loading
501 }
502
503
504
505 type clusterNodes struct {
506 opt *ClusterOptions
507
508 mu sync.RWMutex
509 addrs []string
510 nodes map[string]*clusterNode
511 activeAddrs []string
512 closed bool
513 onNewNode []func(rdb *Client)
514
515 generation uint32
516 }
517
518 func newClusterNodes(opt *ClusterOptions) *clusterNodes {
519 return &clusterNodes{
520 opt: opt,
521 addrs: opt.Addrs,
522 nodes: make(map[string]*clusterNode),
523 }
524 }
525
526 func (c *clusterNodes) Close() error {
527 c.mu.Lock()
528 defer c.mu.Unlock()
529
530 if c.closed {
531 return nil
532 }
533 c.closed = true
534
535 var firstErr error
536 for _, node := range c.nodes {
537 if err := node.Client.Close(); err != nil && firstErr == nil {
538 firstErr = err
539 }
540 }
541
542 c.nodes = nil
543 c.activeAddrs = nil
544
545 return firstErr
546 }
547
548 func (c *clusterNodes) OnNewNode(fn func(rdb *Client)) {
549 c.mu.Lock()
550 c.onNewNode = append(c.onNewNode, fn)
551 c.mu.Unlock()
552 }
553
554 func (c *clusterNodes) Addrs() ([]string, error) {
555 var addrs []string
556
557 c.mu.RLock()
558 closed := c.closed
559 if !closed {
560 if len(c.activeAddrs) > 0 {
561 addrs = make([]string, len(c.activeAddrs))
562 copy(addrs, c.activeAddrs)
563 } else {
564 addrs = make([]string, len(c.addrs))
565 copy(addrs, c.addrs)
566 }
567 }
568 c.mu.RUnlock()
569
570 if closed {
571 return nil, pool.ErrClosed
572 }
573 if len(addrs) == 0 {
574 return nil, errClusterNoNodes
575 }
576 return addrs, nil
577 }
578
579 func (c *clusterNodes) NextGeneration() uint32 {
580 return atomic.AddUint32(&c.generation, 1)
581 }
582
583
584 func (c *clusterNodes) GC(generation uint32) {
585 var collected []*clusterNode
586
587 c.mu.Lock()
588
589 c.activeAddrs = c.activeAddrs[:0]
590 now := time.Now()
591 for addr, node := range c.nodes {
592 if node.Generation() >= generation {
593 c.activeAddrs = append(c.activeAddrs, addr)
594 if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
595 go node.updateLatency()
596 }
597 continue
598 }
599
600 delete(c.nodes, addr)
601 collected = append(collected, node)
602 }
603
604 c.mu.Unlock()
605
606 for _, node := range collected {
607 _ = node.Client.Close()
608 }
609 }
610
611 func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
612 node, err := c.get(addr)
613 if err != nil {
614 return nil, err
615 }
616 if node != nil {
617 return node, nil
618 }
619
620 c.mu.Lock()
621 defer c.mu.Unlock()
622
623 if c.closed {
624 return nil, pool.ErrClosed
625 }
626
627 node, ok := c.nodes[addr]
628 if ok {
629 return node, nil
630 }
631
632 node = newClusterNode(c.opt, addr)
633 for _, fn := range c.onNewNode {
634 fn(node.Client)
635 }
636
637 c.addrs = appendIfNotExist(c.addrs, addr)
638 c.nodes[addr] = node
639
640 return node, nil
641 }
642
643 func (c *clusterNodes) get(addr string) (*clusterNode, error) {
644 c.mu.RLock()
645 defer c.mu.RUnlock()
646
647 if c.closed {
648 return nil, pool.ErrClosed
649 }
650 return c.nodes[addr], nil
651 }
652
653 func (c *clusterNodes) All() ([]*clusterNode, error) {
654 c.mu.RLock()
655 defer c.mu.RUnlock()
656
657 if c.closed {
658 return nil, pool.ErrClosed
659 }
660
661 cp := make([]*clusterNode, 0, len(c.nodes))
662 for _, node := range c.nodes {
663 cp = append(cp, node)
664 }
665 return cp, nil
666 }
667
668 func (c *clusterNodes) Random() (*clusterNode, error) {
669 addrs, err := c.Addrs()
670 if err != nil {
671 return nil, err
672 }
673
674 n := rand.Intn(len(addrs))
675 return c.GetOrCreate(addrs[n])
676 }
677
678
679
680 type clusterSlot struct {
681 start int
682 end int
683 nodes []*clusterNode
684 }
685
686 type clusterSlotSlice []*clusterSlot
687
688 func (p clusterSlotSlice) Len() int {
689 return len(p)
690 }
691
692 func (p clusterSlotSlice) Less(i, j int) bool {
693 return p[i].start < p[j].start
694 }
695
696 func (p clusterSlotSlice) Swap(i, j int) {
697 p[i], p[j] = p[j], p[i]
698 }
699
700 type clusterState struct {
701 nodes *clusterNodes
702 Masters []*clusterNode
703 Slaves []*clusterNode
704
705 slots []*clusterSlot
706
707 generation uint32
708 createdAt time.Time
709 }
710
711 func newClusterState(
712 nodes *clusterNodes, slots []ClusterSlot, origin string,
713 ) (*clusterState, error) {
714 c := clusterState{
715 nodes: nodes,
716
717 slots: make([]*clusterSlot, 0, len(slots)),
718
719 generation: nodes.NextGeneration(),
720 createdAt: time.Now(),
721 }
722
723 originHost, _, _ := net.SplitHostPort(origin)
724 isLoopbackOrigin := isLoopback(originHost)
725
726 for _, slot := range slots {
727 var nodes []*clusterNode
728 for i, slotNode := range slot.Nodes {
729 addr := slotNode.Addr
730 if !isLoopbackOrigin {
731 addr = replaceLoopbackHost(addr, originHost)
732 }
733
734 node, err := c.nodes.GetOrCreate(addr)
735 if err != nil {
736 return nil, err
737 }
738
739 node.SetGeneration(c.generation)
740 nodes = append(nodes, node)
741
742 if i == 0 {
743 c.Masters = appendIfNotExist(c.Masters, node)
744 } else {
745 c.Slaves = appendIfNotExist(c.Slaves, node)
746 }
747 }
748
749 c.slots = append(c.slots, &clusterSlot{
750 start: slot.Start,
751 end: slot.End,
752 nodes: nodes,
753 })
754 }
755
756 sort.Sort(clusterSlotSlice(c.slots))
757
758 time.AfterFunc(time.Minute, func() {
759 nodes.GC(c.generation)
760 })
761
762 return &c, nil
763 }
764
765 func replaceLoopbackHost(nodeAddr, originHost string) string {
766 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
767 if err != nil {
768 return nodeAddr
769 }
770
771 nodeIP := net.ParseIP(nodeHost)
772 if nodeIP == nil {
773 return nodeAddr
774 }
775
776 if !nodeIP.IsLoopback() {
777 return nodeAddr
778 }
779
780
781 return net.JoinHostPort(originHost, nodePort)
782 }
783
784
785
786
787
788 func isLoopback(host string) bool {
789 ip := net.ParseIP(host)
790 if ip != nil {
791 return ip.IsLoopback()
792 }
793
794 if strings.ToLower(host) == "localhost" {
795 return true
796 }
797
798 if strings.HasSuffix(strings.ToLower(host), ".docker.internal") {
799 return true
800 }
801
802 return false
803 }
804
805 func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
806 nodes := c.slotNodes(slot)
807 if len(nodes) > 0 {
808 return nodes[0], nil
809 }
810 return c.nodes.Random()
811 }
812
813 func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
814 nodes := c.slotNodes(slot)
815 switch len(nodes) {
816 case 0:
817 return c.nodes.Random()
818 case 1:
819 return nodes[0], nil
820 case 2:
821 slave := nodes[1]
822 if !slave.Failing() && !slave.Loading() {
823 return slave, nil
824 }
825 return nodes[0], nil
826 default:
827 var slave *clusterNode
828 for i := 0; i < 10; i++ {
829 n := rand.Intn(len(nodes)-1) + 1
830 slave = nodes[n]
831 if !slave.Failing() && !slave.Loading() {
832 return slave, nil
833 }
834 }
835
836
837 return nodes[0], nil
838 }
839 }
840
841 func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
842 nodes := c.slotNodes(slot)
843 if len(nodes) == 0 {
844 return c.nodes.Random()
845 }
846
847 var allNodesFailing = true
848 var (
849 closestNonFailingNode *clusterNode
850 closestNode *clusterNode
851 minLatency time.Duration
852 )
853
854
855 minLatency = time.Duration(math.MaxInt64)
856
857 for _, n := range nodes {
858 if closestNode == nil || n.Latency() < minLatency {
859 closestNode = n
860 minLatency = n.Latency()
861 if !n.Failing() {
862 closestNonFailingNode = n
863 allNodesFailing = false
864 }
865 }
866 }
867
868
869 if !allNodesFailing && closestNonFailingNode != nil {
870 return closestNonFailingNode, nil
871 }
872
873
874 if minLatency < maximumNodeLatency && closestNode != nil {
875 internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency")
876 return closestNode, nil
877 }
878
879
880 internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster")
881 return c.nodes.Random()
882 }
883
884 func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
885 nodes := c.slotNodes(slot)
886 if len(nodes) == 0 {
887 return c.nodes.Random()
888 }
889 if len(nodes) == 1 {
890 return nodes[0], nil
891 }
892 randomNodes := rand.Perm(len(nodes))
893 for _, idx := range randomNodes {
894 if node := nodes[idx]; !node.Failing() {
895 return node, nil
896 }
897 }
898 return nodes[randomNodes[0]], nil
899 }
900
901 func (c *clusterState) slotNodes(slot int) []*clusterNode {
902 i := sort.Search(len(c.slots), func(i int) bool {
903 return c.slots[i].end >= slot
904 })
905 if i >= len(c.slots) {
906 return nil
907 }
908 x := c.slots[i]
909 if slot >= x.start && slot <= x.end {
910 return x.nodes
911 }
912 return nil
913 }
914
915
916
917 type clusterStateHolder struct {
918 load func(ctx context.Context) (*clusterState, error)
919
920 state atomic.Value
921 reloading uint32
922 }
923
924 func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
925 return &clusterStateHolder{
926 load: fn,
927 }
928 }
929
930 func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
931 state, err := c.load(ctx)
932 if err != nil {
933 return nil, err
934 }
935 c.state.Store(state)
936 return state, nil
937 }
938
939 func (c *clusterStateHolder) LazyReload() {
940 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
941 return
942 }
943 go func() {
944 defer atomic.StoreUint32(&c.reloading, 0)
945
946 _, err := c.Reload(context.Background())
947 if err != nil {
948 return
949 }
950 time.Sleep(200 * time.Millisecond)
951 }()
952 }
953
954 func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
955 v := c.state.Load()
956 if v == nil {
957 return c.Reload(ctx)
958 }
959
960 state := v.(*clusterState)
961 if time.Since(state.createdAt) > 10*time.Second {
962 c.LazyReload()
963 }
964 return state, nil
965 }
966
967 func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
968 state, err := c.Reload(ctx)
969 if err == nil {
970 return state, nil
971 }
972 return c.Get(ctx)
973 }
974
975
976
977
978
979
980 type ClusterClient struct {
981 opt *ClusterOptions
982 nodes *clusterNodes
983 state *clusterStateHolder
984 cmdsInfoCache *cmdsInfoCache
985 cmdable
986 hooksMixin
987 }
988
989
990
991 func NewClusterClient(opt *ClusterOptions) *ClusterClient {
992 if opt == nil {
993 panic("redis: NewClusterClient nil options")
994 }
995 opt.init()
996
997 c := &ClusterClient{
998 opt: opt,
999 nodes: newClusterNodes(opt),
1000 }
1001
1002 c.state = newClusterStateHolder(c.loadState)
1003 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
1004 c.cmdable = c.Process
1005
1006 c.initHooks(hooks{
1007 dial: nil,
1008 process: c.process,
1009 pipeline: c.processPipeline,
1010 txPipeline: c.processTxPipeline,
1011 })
1012
1013 return c
1014 }
1015
1016
1017 func (c *ClusterClient) Options() *ClusterOptions {
1018 return c.opt
1019 }
1020
1021
1022
1023 func (c *ClusterClient) ReloadState(ctx context.Context) {
1024 c.state.LazyReload()
1025 }
1026
1027
1028
1029
1030
1031 func (c *ClusterClient) Close() error {
1032 return c.nodes.Close()
1033 }
1034
1035 func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
1036 err := c.processHook(ctx, cmd)
1037 cmd.SetErr(err)
1038 return err
1039 }
1040
1041 func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
1042 slot := c.cmdSlot(cmd, -1)
1043 var node *clusterNode
1044 var moved bool
1045 var ask bool
1046 var lastErr error
1047 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1048
1049
1050 if attempt > 0 && !moved && !ask {
1051 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1052 return err
1053 }
1054 }
1055
1056 if node == nil {
1057 var err error
1058 node, err = c.cmdNode(ctx, cmd.Name(), slot)
1059 if err != nil {
1060 return err
1061 }
1062 }
1063
1064 if ask {
1065 ask = false
1066
1067 pipe := node.Client.Pipeline()
1068 _ = pipe.Process(ctx, NewCmd(ctx, "asking"))
1069 _ = pipe.Process(ctx, cmd)
1070 _, lastErr = pipe.Exec(ctx)
1071 } else {
1072 lastErr = node.Client.Process(ctx, cmd)
1073 }
1074
1075
1076 if lastErr == nil {
1077 return nil
1078 }
1079 if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
1080 if isReadOnly {
1081 c.state.LazyReload()
1082 }
1083 node = nil
1084 continue
1085 }
1086
1087
1088 if c.opt.ReadOnly && isLoadingError(lastErr) {
1089 node.MarkAsFailing()
1090 node = nil
1091 continue
1092 }
1093
1094 var addr string
1095 moved, ask, addr = isMovedError(lastErr)
1096 if moved || ask {
1097 c.state.LazyReload()
1098
1099 var err error
1100 node, err = c.nodes.GetOrCreate(addr)
1101 if err != nil {
1102 return err
1103 }
1104 continue
1105 }
1106
1107 if shouldRetry(lastErr, cmd.readTimeout() == nil) {
1108
1109 if attempt == 0 {
1110 continue
1111 }
1112
1113
1114 node.MarkAsFailing()
1115 node = nil
1116 continue
1117 }
1118
1119 return lastErr
1120 }
1121 return lastErr
1122 }
1123
1124 func (c *ClusterClient) OnNewNode(fn func(rdb *Client)) {
1125 c.nodes.OnNewNode(fn)
1126 }
1127
1128
1129
1130 func (c *ClusterClient) ForEachMaster(
1131 ctx context.Context,
1132 fn func(ctx context.Context, client *Client) error,
1133 ) error {
1134 state, err := c.state.ReloadOrGet(ctx)
1135 if err != nil {
1136 return err
1137 }
1138
1139 var wg sync.WaitGroup
1140 errCh := make(chan error, 1)
1141
1142 for _, master := range state.Masters {
1143 wg.Add(1)
1144 go func(node *clusterNode) {
1145 defer wg.Done()
1146 err := fn(ctx, node.Client)
1147 if err != nil {
1148 select {
1149 case errCh <- err:
1150 default:
1151 }
1152 }
1153 }(master)
1154 }
1155
1156 wg.Wait()
1157
1158 select {
1159 case err := <-errCh:
1160 return err
1161 default:
1162 return nil
1163 }
1164 }
1165
1166
1167
1168 func (c *ClusterClient) ForEachSlave(
1169 ctx context.Context,
1170 fn func(ctx context.Context, client *Client) error,
1171 ) error {
1172 state, err := c.state.ReloadOrGet(ctx)
1173 if err != nil {
1174 return err
1175 }
1176
1177 var wg sync.WaitGroup
1178 errCh := make(chan error, 1)
1179
1180 for _, slave := range state.Slaves {
1181 wg.Add(1)
1182 go func(node *clusterNode) {
1183 defer wg.Done()
1184 err := fn(ctx, node.Client)
1185 if err != nil {
1186 select {
1187 case errCh <- err:
1188 default:
1189 }
1190 }
1191 }(slave)
1192 }
1193
1194 wg.Wait()
1195
1196 select {
1197 case err := <-errCh:
1198 return err
1199 default:
1200 return nil
1201 }
1202 }
1203
1204
1205
1206 func (c *ClusterClient) ForEachShard(
1207 ctx context.Context,
1208 fn func(ctx context.Context, client *Client) error,
1209 ) error {
1210 state, err := c.state.ReloadOrGet(ctx)
1211 if err != nil {
1212 return err
1213 }
1214
1215 var wg sync.WaitGroup
1216 errCh := make(chan error, 1)
1217
1218 worker := func(node *clusterNode) {
1219 defer wg.Done()
1220 err := fn(ctx, node.Client)
1221 if err != nil {
1222 select {
1223 case errCh <- err:
1224 default:
1225 }
1226 }
1227 }
1228
1229 for _, node := range state.Masters {
1230 wg.Add(1)
1231 go worker(node)
1232 }
1233 for _, node := range state.Slaves {
1234 wg.Add(1)
1235 go worker(node)
1236 }
1237
1238 wg.Wait()
1239
1240 select {
1241 case err := <-errCh:
1242 return err
1243 default:
1244 return nil
1245 }
1246 }
1247
1248
1249 func (c *ClusterClient) PoolStats() *PoolStats {
1250 var acc PoolStats
1251
1252 state, _ := c.state.Get(context.TODO())
1253 if state == nil {
1254 return &acc
1255 }
1256
1257 for _, node := range state.Masters {
1258 s := node.Client.connPool.Stats()
1259 acc.Hits += s.Hits
1260 acc.Misses += s.Misses
1261 acc.Timeouts += s.Timeouts
1262
1263 acc.TotalConns += s.TotalConns
1264 acc.IdleConns += s.IdleConns
1265 acc.StaleConns += s.StaleConns
1266 }
1267
1268 for _, node := range state.Slaves {
1269 s := node.Client.connPool.Stats()
1270 acc.Hits += s.Hits
1271 acc.Misses += s.Misses
1272 acc.Timeouts += s.Timeouts
1273
1274 acc.TotalConns += s.TotalConns
1275 acc.IdleConns += s.IdleConns
1276 acc.StaleConns += s.StaleConns
1277 }
1278
1279 return &acc
1280 }
1281
1282 func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
1283 if c.opt.ClusterSlots != nil {
1284 slots, err := c.opt.ClusterSlots(ctx)
1285 if err != nil {
1286 return nil, err
1287 }
1288 return newClusterState(c.nodes, slots, "")
1289 }
1290
1291 addrs, err := c.nodes.Addrs()
1292 if err != nil {
1293 return nil, err
1294 }
1295
1296 var firstErr error
1297
1298 for _, idx := range rand.Perm(len(addrs)) {
1299 addr := addrs[idx]
1300
1301 node, err := c.nodes.GetOrCreate(addr)
1302 if err != nil {
1303 if firstErr == nil {
1304 firstErr = err
1305 }
1306 continue
1307 }
1308
1309 slots, err := node.Client.ClusterSlots(ctx).Result()
1310 if err != nil {
1311 if firstErr == nil {
1312 firstErr = err
1313 }
1314 continue
1315 }
1316
1317 return newClusterState(c.nodes, slots, addr)
1318 }
1319
1320
1326 c.nodes.mu.Lock()
1327 c.nodes.activeAddrs = nil
1328 c.nodes.mu.Unlock()
1329
1330 return nil, firstErr
1331 }
1332
1333 func (c *ClusterClient) Pipeline() Pipeliner {
1334 pipe := Pipeline{
1335 exec: pipelineExecer(c.processPipelineHook),
1336 }
1337 pipe.init()
1338 return &pipe
1339 }
1340
1341 func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1342 return c.Pipeline().Pipelined(ctx, fn)
1343 }
1344
1345 func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1346 cmdsMap := newCmdsMap()
1347
1348 if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
1349 setCmdsErr(cmds, err)
1350 return err
1351 }
1352
1353 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1354 if attempt > 0 {
1355 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1356 setCmdsErr(cmds, err)
1357 return err
1358 }
1359 }
1360
1361 failedCmds := newCmdsMap()
1362 var wg sync.WaitGroup
1363
1364 for node, cmds := range cmdsMap.m {
1365 wg.Add(1)
1366 go func(node *clusterNode, cmds []Cmder) {
1367 defer wg.Done()
1368 c.processPipelineNode(ctx, node, cmds, failedCmds)
1369 }(node, cmds)
1370 }
1371
1372 wg.Wait()
1373 if len(failedCmds.m) == 0 {
1374 break
1375 }
1376 cmdsMap = failedCmds
1377 }
1378
1379 return cmdsFirstErr(cmds)
1380 }
1381
1382 func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1383 state, err := c.state.Get(ctx)
1384 if err != nil {
1385 return err
1386 }
1387
1388 preferredRandomSlot := -1
1389 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
1390 for _, cmd := range cmds {
1391 slot := c.cmdSlot(cmd, preferredRandomSlot)
1392 if preferredRandomSlot == -1 {
1393 preferredRandomSlot = slot
1394 }
1395 node, err := c.slotReadOnlyNode(state, slot)
1396 if err != nil {
1397 return err
1398 }
1399 cmdsMap.Add(node, cmd)
1400 }
1401 return nil
1402 }
1403
1404 for _, cmd := range cmds {
1405 slot := c.cmdSlot(cmd, preferredRandomSlot)
1406 if preferredRandomSlot == -1 {
1407 preferredRandomSlot = slot
1408 }
1409 node, err := state.slotMasterNode(slot)
1410 if err != nil {
1411 return err
1412 }
1413 cmdsMap.Add(node, cmd)
1414 }
1415 return nil
1416 }
1417
1418 func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
1419 for _, cmd := range cmds {
1420 cmdInfo := c.cmdInfo(ctx, cmd.Name())
1421 if cmdInfo == nil || !cmdInfo.ReadOnly {
1422 return false
1423 }
1424 }
1425 return true
1426 }
1427
1428 func (c *ClusterClient) processPipelineNode(
1429 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1430 ) {
1431 _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1432 cn, err := node.Client.getConn(ctx)
1433 if err != nil {
1434 if !isContextError(err) {
1435 node.MarkAsFailing()
1436 }
1437 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1438 setCmdsErr(cmds, err)
1439 return err
1440 }
1441
1442 var processErr error
1443 defer func() {
1444 node.Client.releaseConn(ctx, cn, processErr)
1445 }()
1446 processErr = c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1447
1448 return processErr
1449 })
1450 }
1451
1452 func (c *ClusterClient) processPipelineNodeConn(
1453 ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1454 ) error {
1455 if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1456 return writeCmds(wr, cmds)
1457 }); err != nil {
1458 if isBadConn(err, false, node.Client.getAddr()) {
1459 node.MarkAsFailing()
1460 }
1461 if shouldRetry(err, true) {
1462 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1463 }
1464 setCmdsErr(cmds, err)
1465 return err
1466 }
1467
1468 return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1469 return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1470 })
1471 }
1472
1473 func (c *ClusterClient) pipelineReadCmds(
1474 ctx context.Context,
1475 node *clusterNode,
1476 rd *proto.Reader,
1477 cmds []Cmder,
1478 failedCmds *cmdsMap,
1479 ) error {
1480 for i, cmd := range cmds {
1481 err := cmd.readReply(rd)
1482 cmd.SetErr(err)
1483
1484 if err == nil {
1485 continue
1486 }
1487
1488 if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1489 continue
1490 }
1491
1492 if c.opt.ReadOnly && isBadConn(err, false, node.Client.getAddr()) {
1493 node.MarkAsFailing()
1494 }
1495
1496 if !isRedisError(err) {
1497 if shouldRetry(err, true) {
1498 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1499 }
1500 setCmdsErr(cmds[i+1:], err)
1501 return err
1502 }
1503 }
1504
1505 if err := cmds[0].Err(); err != nil && shouldRetry(err, true) {
1506 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1507 return err
1508 }
1509
1510 return nil
1511 }
1512
1513 func (c *ClusterClient) checkMovedErr(
1514 ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
1515 ) bool {
1516 moved, ask, addr := isMovedError(err)
1517 if !moved && !ask {
1518 return false
1519 }
1520
1521 node, err := c.nodes.GetOrCreate(addr)
1522 if err != nil {
1523 return false
1524 }
1525
1526 if moved {
1527 c.state.LazyReload()
1528 failedCmds.Add(node, cmd)
1529 return true
1530 }
1531
1532 if ask {
1533 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1534 return true
1535 }
1536
1537 panic("not reached")
1538 }
1539
1540
1541 func (c *ClusterClient) TxPipeline() Pipeliner {
1542 pipe := Pipeline{
1543 exec: func(ctx context.Context, cmds []Cmder) error {
1544 cmds = wrapMultiExec(ctx, cmds)
1545 return c.processTxPipelineHook(ctx, cmds)
1546 },
1547 }
1548 pipe.init()
1549 return &pipe
1550 }
1551
1552 func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
1553 return c.TxPipeline().Pipelined(ctx, fn)
1554 }
1555
1556 func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1557
1558 cmds = cmds[1 : len(cmds)-1]
1559
1560 if len(cmds) == 0 {
1561 return nil
1562 }
1563
1564 state, err := c.state.Get(ctx)
1565 if err != nil {
1566 setCmdsErr(cmds, err)
1567 return err
1568 }
1569
1570 keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
1571 slot := -1
1572 switch len(keyedCmdsBySlot) {
1573 case 0:
1574 slot = hashtag.RandomSlot()
1575 case 1:
1576 for sl := range keyedCmdsBySlot {
1577 slot = sl
1578 break
1579 }
1580 default:
1581
1582 setCmdsErr(cmds, ErrCrossSlot)
1583 return ErrCrossSlot
1584 }
1585
1586 node, err := state.slotMasterNode(slot)
1587 if err != nil {
1588 setCmdsErr(cmds, err)
1589 return err
1590 }
1591
1592 cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1593 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1594 if attempt > 0 {
1595 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1596 setCmdsErr(cmds, err)
1597 return err
1598 }
1599 }
1600
1601 failedCmds := newCmdsMap()
1602 var wg sync.WaitGroup
1603
1604 for node, cmds := range cmdsMap {
1605 wg.Add(1)
1606 go func(node *clusterNode, cmds []Cmder) {
1607 defer wg.Done()
1608 c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1609 }(node, cmds)
1610 }
1611
1612 wg.Wait()
1613 if len(failedCmds.m) == 0 {
1614 break
1615 }
1616 cmdsMap = failedCmds.m
1617 }
1618
1619 return cmdsFirstErr(cmds)
1620 }
1621
1622
1623
1624 func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
1625 cmdsSlots := map[int][]Cmder{}
1626
1627 preferredRandomSlot := -1
1628 for _, cmd := range cmds {
1629 if cmdFirstKeyPos(cmd) == 0 {
1630 continue
1631 }
1632
1633 slot := c.cmdSlot(cmd, preferredRandomSlot)
1634 if preferredRandomSlot == -1 {
1635 preferredRandomSlot = slot
1636 }
1637
1638 cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
1639 }
1640
1641 return cmdsSlots
1642 }
1643
1644 func (c *ClusterClient) processTxPipelineNode(
1645 ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
1646 ) {
1647 cmds = wrapMultiExec(ctx, cmds)
1648 _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1649 cn, err := node.Client.getConn(ctx)
1650 if err != nil {
1651 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1652 setCmdsErr(cmds, err)
1653 return err
1654 }
1655
1656 var processErr error
1657 defer func() {
1658 node.Client.releaseConn(ctx, cn, processErr)
1659 }()
1660 processErr = c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1661
1662 return processErr
1663 })
1664 }
1665
1666 func (c *ClusterClient) processTxPipelineNodeConn(
1667 ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1668 ) error {
1669 if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1670 return writeCmds(wr, cmds)
1671 }); err != nil {
1672 if shouldRetry(err, true) {
1673 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1674 }
1675 setCmdsErr(cmds, err)
1676 return err
1677 }
1678
1679 return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1680 statusCmd := cmds[0].(*StatusCmd)
1681
1682 trimmedCmds := cmds[1 : len(cmds)-1]
1683
1684 if err := c.txPipelineReadQueued(
1685 ctx, rd, statusCmd, trimmedCmds, failedCmds,
1686 ); err != nil {
1687 setCmdsErr(cmds, err)
1688
1689 moved, ask, addr := isMovedError(err)
1690 if moved || ask {
1691 return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
1692 }
1693
1694 return err
1695 }
1696
1697 return pipelineReadCmds(rd, trimmedCmds)
1698 })
1699 }
1700
1701 func (c *ClusterClient) txPipelineReadQueued(
1702 ctx context.Context,
1703 rd *proto.Reader,
1704 statusCmd *StatusCmd,
1705 cmds []Cmder,
1706 failedCmds *cmdsMap,
1707 ) error {
1708
1709 if err := statusCmd.readReply(rd); err != nil {
1710 return err
1711 }
1712
1713 for _, cmd := range cmds {
1714 err := statusCmd.readReply(rd)
1715 if err != nil {
1716 if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1717
1718 continue
1719 }
1720 cmd.SetErr(err)
1721 if !isRedisError(err) {
1722 return err
1723 }
1724 }
1725 }
1726
1727
1728 line, err := rd.ReadLine()
1729 if err != nil {
1730 if err == Nil {
1731 err = TxFailedErr
1732 }
1733 return err
1734 }
1735
1736 if line[0] != proto.RespArray {
1737 return fmt.Errorf("redis: expected '*', but got line %q", line)
1738 }
1739
1740 return nil
1741 }
1742
1743 func (c *ClusterClient) cmdsMoved(
1744 ctx context.Context, cmds []Cmder,
1745 moved, ask bool,
1746 addr string,
1747 failedCmds *cmdsMap,
1748 ) error {
1749 node, err := c.nodes.GetOrCreate(addr)
1750 if err != nil {
1751 return err
1752 }
1753
1754 if moved {
1755 c.state.LazyReload()
1756 for _, cmd := range cmds {
1757 failedCmds.Add(node, cmd)
1758 }
1759 return nil
1760 }
1761
1762 if ask {
1763 for _, cmd := range cmds {
1764 failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
1765 }
1766 return nil
1767 }
1768
1769 return nil
1770 }
1771
1772 func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
1773 if len(keys) == 0 {
1774 return fmt.Errorf("redis: Watch requires at least one key")
1775 }
1776
1777 slot := hashtag.Slot(keys[0])
1778 for _, key := range keys[1:] {
1779 if hashtag.Slot(key) != slot {
1780 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
1781 return err
1782 }
1783 }
1784
1785 node, err := c.slotMasterNode(ctx, slot)
1786 if err != nil {
1787 return err
1788 }
1789
1790 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1791 if attempt > 0 {
1792 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1793 return err
1794 }
1795 }
1796
1797 err = node.Client.Watch(ctx, fn, keys...)
1798 if err == nil {
1799 break
1800 }
1801
1802 moved, ask, addr := isMovedError(err)
1803 if moved || ask {
1804 node, err = c.nodes.GetOrCreate(addr)
1805 if err != nil {
1806 return err
1807 }
1808 continue
1809 }
1810
1811 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
1812 if isReadOnly {
1813 c.state.LazyReload()
1814 }
1815 node, err = c.slotMasterNode(ctx, slot)
1816 if err != nil {
1817 return err
1818 }
1819 continue
1820 }
1821
1822 if shouldRetry(err, true) {
1823 continue
1824 }
1825
1826 return err
1827 }
1828
1829 return err
1830 }
1831
1832 func (c *ClusterClient) pubSub() *PubSub {
1833 var node *clusterNode
1834 pubsub := &PubSub{
1835 opt: c.opt.clientOptions(),
1836
1837 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
1838 if node != nil {
1839 panic("node != nil")
1840 }
1841
1842 var err error
1843
1844 if len(channels) > 0 {
1845 slot := hashtag.Slot(channels[0])
1846
1847
1848
1849 if c.opt.ReadOnly {
1850 state, err := c.state.Get(ctx)
1851 if err != nil {
1852 return nil, err
1853 }
1854
1855 node, err = c.slotReadOnlyNode(state, slot)
1856 if err != nil {
1857 return nil, err
1858 }
1859 } else {
1860 node, err = c.slotMasterNode(ctx, slot)
1861 if err != nil {
1862 return nil, err
1863 }
1864 }
1865 } else {
1866 node, err = c.nodes.Random()
1867 if err != nil {
1868 return nil, err
1869 }
1870 }
1871
1872 cn, err := node.Client.newConn(context.TODO())
1873 if err != nil {
1874 node = nil
1875
1876 return nil, err
1877 }
1878
1879 return cn, nil
1880 },
1881 closeConn: func(cn *pool.Conn) error {
1882 err := node.Client.connPool.CloseConn(cn)
1883 node = nil
1884 return err
1885 },
1886 }
1887 pubsub.init()
1888
1889 return pubsub
1890 }
1891
1892
1893
1894 func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
1895 pubsub := c.pubSub()
1896 if len(channels) > 0 {
1897 _ = pubsub.Subscribe(ctx, channels...)
1898 }
1899 return pubsub
1900 }
1901
1902
1903
1904 func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
1905 pubsub := c.pubSub()
1906 if len(channels) > 0 {
1907 _ = pubsub.PSubscribe(ctx, channels...)
1908 }
1909 return pubsub
1910 }
1911
1912
1913 func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) *PubSub {
1914 pubsub := c.pubSub()
1915 if len(channels) > 0 {
1916 _ = pubsub.SSubscribe(ctx, channels...)
1917 }
1918 return pubsub
1919 }
1920
1921 func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
1922 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
1923 }
1924
1925 func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
1926
1927 const nodeLimit = 3
1928
1929 addrs, err := c.nodes.Addrs()
1930 if err != nil {
1931 return nil, err
1932 }
1933
1934 var firstErr error
1935
1936 perm := rand.Perm(len(addrs))
1937 if len(perm) > nodeLimit {
1938 perm = perm[:nodeLimit]
1939 }
1940
1941 for _, idx := range perm {
1942 addr := addrs[idx]
1943
1944 node, err := c.nodes.GetOrCreate(addr)
1945 if err != nil {
1946 if firstErr == nil {
1947 firstErr = err
1948 }
1949 continue
1950 }
1951
1952 info, err := node.Client.Command(ctx).Result()
1953 if err == nil {
1954 return info, nil
1955 }
1956 if firstErr == nil {
1957 firstErr = err
1958 }
1959 }
1960
1961 if firstErr == nil {
1962 panic("not reached")
1963 }
1964 return nil, firstErr
1965 }
1966
1967 func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
1968 cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
1969 if err != nil {
1970 internal.Logger.Printf(context.TODO(), "getting command info: %s", err)
1971 return nil
1972 }
1973
1974 info := cmdsInfo[name]
1975 if info == nil {
1976 internal.Logger.Printf(context.TODO(), "info for cmd=%s not found", name)
1977 }
1978 return info
1979 }
1980
1981 func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
1982 args := cmd.Args()
1983 if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
1984 return args[2].(int)
1985 }
1986
1987 return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
1988 }
1989
1990 func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
1991 if pos == 0 {
1992 if preferredRandomSlot != -1 {
1993 return preferredRandomSlot
1994 }
1995 return hashtag.RandomSlot()
1996 }
1997 firstKey := cmd.stringArg(pos)
1998 return hashtag.Slot(firstKey)
1999 }
2000
2001 func (c *ClusterClient) cmdNode(
2002 ctx context.Context,
2003 cmdName string,
2004 slot int,
2005 ) (*clusterNode, error) {
2006 state, err := c.state.Get(ctx)
2007 if err != nil {
2008 return nil, err
2009 }
2010
2011 if c.opt.ReadOnly {
2012 cmdInfo := c.cmdInfo(ctx, cmdName)
2013 if cmdInfo != nil && cmdInfo.ReadOnly {
2014 return c.slotReadOnlyNode(state, slot)
2015 }
2016 }
2017 return state.slotMasterNode(slot)
2018 }
2019
2020 func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
2021 if c.opt.RouteByLatency {
2022 return state.slotClosestNode(slot)
2023 }
2024 if c.opt.RouteRandomly {
2025 return state.slotRandomNode(slot)
2026 }
2027 return state.slotSlaveNode(slot)
2028 }
2029
2030 func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
2031 state, err := c.state.Get(ctx)
2032 if err != nil {
2033 return nil, err
2034 }
2035 return state.slotMasterNode(slot)
2036 }
2037
2038
2039
2040
2041
2042
2043
2044 func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
2045 state, err := c.state.Get(ctx)
2046 if err != nil {
2047 return nil, err
2048 }
2049 slot := hashtag.Slot(key)
2050 node, err := c.slotReadOnlyNode(state, slot)
2051 if err != nil {
2052 return nil, err
2053 }
2054 return node.Client, err
2055 }
2056
2057
2058 func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
2059 slot := hashtag.Slot(key)
2060 node, err := c.slotMasterNode(ctx, slot)
2061 if err != nil {
2062 return nil, err
2063 }
2064 return node.Client, nil
2065 }
2066
2067 func (c *ClusterClient) context(ctx context.Context) context.Context {
2068 if c.opt.ContextTimeoutEnabled {
2069 return ctx
2070 }
2071 return context.Background()
2072 }
2073
2074 func appendIfNotExist[T comparable](vals []T, newVal T) []T {
2075 for _, v := range vals {
2076 if v == newVal {
2077 return vals
2078 }
2079 }
2080 return append(vals, newVal)
2081 }
2082
2083
2084
2085 type cmdsMap struct {
2086 mu sync.Mutex
2087 m map[*clusterNode][]Cmder
2088 }
2089
2090 func newCmdsMap() *cmdsMap {
2091 return &cmdsMap{
2092 m: make(map[*clusterNode][]Cmder),
2093 }
2094 }
2095
2096 func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
2097 m.mu.Lock()
2098 m.m[node] = append(m.m[node], cmds...)
2099 m.mu.Unlock()
2100 }
2101
View as plain text