1 package redis
2
3 import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "net"
9 "net/url"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/redis/go-redis/v9/auth"
16 "github.com/redis/go-redis/v9/internal"
17 "github.com/redis/go-redis/v9/internal/pool"
18 "github.com/redis/go-redis/v9/internal/rand"
19 "github.com/redis/go-redis/v9/internal/util"
20 )
21
22
23
24
25
26 type FailoverOptions struct {
27
28 MasterName string
29
30 SentinelAddrs []string
31
32
33 ClientName string
34
35
36
37 SentinelUsername string
38
39
40
41 SentinelPassword string
42
43
44
45 RouteByLatency bool
46
47
48 RouteRandomly bool
49
50
51 ReplicaOnly bool
52
53
54
55 UseDisconnectedReplicas bool
56
57
58
59 Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
60 OnConnect func(ctx context.Context, cn *Conn) error
61
62 Protocol int
63 Username string
64 Password string
65
66
67 CredentialsProvider func() (username string, password string)
68
69
70
71
72
73 CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)
74
75
76
77
78
79
80
81 StreamingCredentialsProvider auth.StreamingCredentialsProvider
82 DB int
83
84 MaxRetries int
85 MinRetryBackoff time.Duration
86 MaxRetryBackoff time.Duration
87
88 DialTimeout time.Duration
89 ReadTimeout time.Duration
90 WriteTimeout time.Duration
91 ContextTimeoutEnabled bool
92
93
94
95
96
97
98 ReadBufferSize int
99
100
101
102
103
104
105 WriteBufferSize int
106
107 PoolFIFO bool
108
109 PoolSize int
110 PoolTimeout time.Duration
111 MinIdleConns int
112 MaxIdleConns int
113 MaxActiveConns int
114 ConnMaxIdleTime time.Duration
115 ConnMaxLifetime time.Duration
116
117 TLSConfig *tls.Config
118
119
120
121
122
123
124 DisableIndentity bool
125
126
127
128
129 DisableIdentity bool
130
131 IdentitySuffix string
132
133
134
135
136 FailingTimeoutSeconds int
137
138 UnstableResp3 bool
139 }
140
141 func (opt *FailoverOptions) clientOptions() *Options {
142 return &Options{
143 Addr: "FailoverClient",
144 ClientName: opt.ClientName,
145
146 Dialer: opt.Dialer,
147 OnConnect: opt.OnConnect,
148
149 DB: opt.DB,
150 Protocol: opt.Protocol,
151 Username: opt.Username,
152 Password: opt.Password,
153 CredentialsProvider: opt.CredentialsProvider,
154 CredentialsProviderContext: opt.CredentialsProviderContext,
155 StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
156
157 MaxRetries: opt.MaxRetries,
158 MinRetryBackoff: opt.MinRetryBackoff,
159 MaxRetryBackoff: opt.MaxRetryBackoff,
160
161 ReadBufferSize: opt.ReadBufferSize,
162 WriteBufferSize: opt.WriteBufferSize,
163
164 DialTimeout: opt.DialTimeout,
165 ReadTimeout: opt.ReadTimeout,
166 WriteTimeout: opt.WriteTimeout,
167 ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
168
169 PoolFIFO: opt.PoolFIFO,
170 PoolSize: opt.PoolSize,
171 PoolTimeout: opt.PoolTimeout,
172 MinIdleConns: opt.MinIdleConns,
173 MaxIdleConns: opt.MaxIdleConns,
174 MaxActiveConns: opt.MaxActiveConns,
175 ConnMaxIdleTime: opt.ConnMaxIdleTime,
176 ConnMaxLifetime: opt.ConnMaxLifetime,
177
178 TLSConfig: opt.TLSConfig,
179
180 DisableIdentity: opt.DisableIdentity,
181 DisableIndentity: opt.DisableIndentity,
182
183 IdentitySuffix: opt.IdentitySuffix,
184 UnstableResp3: opt.UnstableResp3,
185 }
186 }
187
188 func (opt *FailoverOptions) sentinelOptions(addr string) *Options {
189 return &Options{
190 Addr: addr,
191 ClientName: opt.ClientName,
192
193 Dialer: opt.Dialer,
194 OnConnect: opt.OnConnect,
195
196 DB: 0,
197 Username: opt.SentinelUsername,
198 Password: opt.SentinelPassword,
199
200 MaxRetries: opt.MaxRetries,
201 MinRetryBackoff: opt.MinRetryBackoff,
202 MaxRetryBackoff: opt.MaxRetryBackoff,
203
204
205 ReadBufferSize: 4096,
206 WriteBufferSize: 4096,
207
208 DialTimeout: opt.DialTimeout,
209 ReadTimeout: opt.ReadTimeout,
210 WriteTimeout: opt.WriteTimeout,
211 ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
212
213 PoolFIFO: opt.PoolFIFO,
214 PoolSize: opt.PoolSize,
215 PoolTimeout: opt.PoolTimeout,
216 MinIdleConns: opt.MinIdleConns,
217 MaxIdleConns: opt.MaxIdleConns,
218 MaxActiveConns: opt.MaxActiveConns,
219 ConnMaxIdleTime: opt.ConnMaxIdleTime,
220 ConnMaxLifetime: opt.ConnMaxLifetime,
221
222 TLSConfig: opt.TLSConfig,
223
224 DisableIdentity: opt.DisableIdentity,
225 DisableIndentity: opt.DisableIndentity,
226
227 IdentitySuffix: opt.IdentitySuffix,
228 UnstableResp3: opt.UnstableResp3,
229 }
230 }
231
232 func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
233 return &ClusterOptions{
234 ClientName: opt.ClientName,
235
236 Dialer: opt.Dialer,
237 OnConnect: opt.OnConnect,
238
239 Protocol: opt.Protocol,
240 Username: opt.Username,
241 Password: opt.Password,
242 CredentialsProvider: opt.CredentialsProvider,
243 CredentialsProviderContext: opt.CredentialsProviderContext,
244 StreamingCredentialsProvider: opt.StreamingCredentialsProvider,
245
246 MaxRedirects: opt.MaxRetries,
247
248 ReadOnly: opt.ReplicaOnly,
249 RouteByLatency: opt.RouteByLatency,
250 RouteRandomly: opt.RouteRandomly,
251
252 MinRetryBackoff: opt.MinRetryBackoff,
253 MaxRetryBackoff: opt.MaxRetryBackoff,
254
255 ReadBufferSize: opt.ReadBufferSize,
256 WriteBufferSize: opt.WriteBufferSize,
257
258 DialTimeout: opt.DialTimeout,
259 ReadTimeout: opt.ReadTimeout,
260 WriteTimeout: opt.WriteTimeout,
261 ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
262
263 PoolFIFO: opt.PoolFIFO,
264 PoolSize: opt.PoolSize,
265 PoolTimeout: opt.PoolTimeout,
266 MinIdleConns: opt.MinIdleConns,
267 MaxIdleConns: opt.MaxIdleConns,
268 MaxActiveConns: opt.MaxActiveConns,
269 ConnMaxIdleTime: opt.ConnMaxIdleTime,
270 ConnMaxLifetime: opt.ConnMaxLifetime,
271
272 TLSConfig: opt.TLSConfig,
273
274 DisableIdentity: opt.DisableIdentity,
275 DisableIndentity: opt.DisableIndentity,
276 IdentitySuffix: opt.IdentitySuffix,
277 FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 func ParseFailoverURL(redisURL string) (*FailoverOptions, error) {
319 u, err := url.Parse(redisURL)
320 if err != nil {
321 return nil, err
322 }
323 return setupFailoverConn(u)
324 }
325
326 func setupFailoverConn(u *url.URL) (*FailoverOptions, error) {
327 o := &FailoverOptions{}
328
329 o.SentinelUsername, o.SentinelPassword = getUserPassword(u)
330
331 h, p := getHostPortWithDefaults(u)
332 o.SentinelAddrs = append(o.SentinelAddrs, net.JoinHostPort(h, p))
333
334 switch u.Scheme {
335 case "rediss":
336 o.TLSConfig = &tls.Config{ServerName: h, MinVersion: tls.VersionTLS12}
337 case "redis":
338 o.TLSConfig = nil
339 default:
340 return nil, fmt.Errorf("redis: invalid URL scheme: %s", u.Scheme)
341 }
342
343 f := strings.FieldsFunc(u.Path, func(r rune) bool {
344 return r == '/'
345 })
346 switch len(f) {
347 case 0:
348 o.DB = 0
349 case 1:
350 var err error
351 if o.DB, err = strconv.Atoi(f[0]); err != nil {
352 return nil, fmt.Errorf("redis: invalid database number: %q", f[0])
353 }
354 default:
355 return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path)
356 }
357
358 return setupFailoverConnParams(u, o)
359 }
360
361 func setupFailoverConnParams(u *url.URL, o *FailoverOptions) (*FailoverOptions, error) {
362 q := queryOptions{q: u.Query()}
363
364 o.MasterName = q.string("master_name")
365 o.ClientName = q.string("client_name")
366 o.RouteByLatency = q.bool("route_by_latency")
367 o.RouteRandomly = q.bool("route_randomly")
368 o.ReplicaOnly = q.bool("replica_only")
369 o.UseDisconnectedReplicas = q.bool("use_disconnected_replicas")
370 o.Protocol = q.int("protocol")
371 o.Username = q.string("username")
372 o.Password = q.string("password")
373 o.MaxRetries = q.int("max_retries")
374 o.MinRetryBackoff = q.duration("min_retry_backoff")
375 o.MaxRetryBackoff = q.duration("max_retry_backoff")
376 o.DialTimeout = q.duration("dial_timeout")
377 o.ReadTimeout = q.duration("read_timeout")
378 o.WriteTimeout = q.duration("write_timeout")
379 o.ContextTimeoutEnabled = q.bool("context_timeout_enabled")
380 o.PoolFIFO = q.bool("pool_fifo")
381 o.PoolSize = q.int("pool_size")
382 o.MinIdleConns = q.int("min_idle_conns")
383 o.MaxIdleConns = q.int("max_idle_conns")
384 o.MaxActiveConns = q.int("max_active_conns")
385 o.ConnMaxLifetime = q.duration("conn_max_lifetime")
386 o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
387 o.PoolTimeout = q.duration("pool_timeout")
388 o.DisableIdentity = q.bool("disableIdentity")
389 o.IdentitySuffix = q.string("identitySuffix")
390 o.UnstableResp3 = q.bool("unstable_resp3")
391
392 if q.err != nil {
393 return nil, q.err
394 }
395
396 if tmp := q.string("db"); tmp != "" {
397 db, err := strconv.Atoi(tmp)
398 if err != nil {
399 return nil, fmt.Errorf("redis: invalid database number: %w", err)
400 }
401 o.DB = db
402 }
403
404 addrs := q.strings("addr")
405 for _, addr := range addrs {
406 h, p, err := net.SplitHostPort(addr)
407 if err != nil || h == "" || p == "" {
408 return nil, fmt.Errorf("redis: unable to parse addr param: %s", addr)
409 }
410
411 o.SentinelAddrs = append(o.SentinelAddrs, net.JoinHostPort(h, p))
412 }
413
414 if o.TLSConfig != nil && q.has("skip_verify") {
415 o.TLSConfig.InsecureSkipVerify = q.bool("skip_verify")
416 }
417
418
419 if r := q.remaining(); len(r) > 0 {
420 return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
421 }
422
423 return o, nil
424 }
425
426
427
428
429 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
430 if failoverOpt == nil {
431 panic("redis: NewFailoverClient nil options")
432 }
433
434 if failoverOpt.RouteByLatency {
435 panic("to route commands by latency, use NewFailoverClusterClient")
436 }
437 if failoverOpt.RouteRandomly {
438 panic("to route commands randomly, use NewFailoverClusterClient")
439 }
440
441 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
442 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
443
444 rand.Shuffle(len(sentinelAddrs), func(i, j int) {
445 sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]
446 })
447
448 failover := &sentinelFailover{
449 opt: failoverOpt,
450 sentinelAddrs: sentinelAddrs,
451 }
452
453 opt := failoverOpt.clientOptions()
454 opt.Dialer = masterReplicaDialer(failover)
455 opt.init()
456
457 var connPool *pool.ConnPool
458
459 rdb := &Client{
460 baseClient: &baseClient{
461 opt: opt,
462 },
463 }
464 rdb.init()
465
466 connPool = newConnPool(opt, rdb.dialHook)
467 rdb.connPool = connPool
468 rdb.onClose = rdb.wrappedOnClose(failover.Close)
469
470 failover.mu.Lock()
471 failover.onFailover = func(ctx context.Context, addr string) {
472 _ = connPool.Filter(func(cn *pool.Conn) bool {
473 return cn.RemoteAddr().String() != addr
474 })
475 }
476 failover.mu.Unlock()
477
478 return rdb
479 }
480
481 func masterReplicaDialer(
482 failover *sentinelFailover,
483 ) func(ctx context.Context, network, addr string) (net.Conn, error) {
484 return func(ctx context.Context, network, _ string) (net.Conn, error) {
485 var addr string
486 var err error
487
488 if failover.opt.ReplicaOnly {
489 addr, err = failover.RandomReplicaAddr(ctx)
490 } else {
491 addr, err = failover.MasterAddr(ctx)
492 if err == nil {
493 failover.trySwitchMaster(ctx, addr)
494 }
495 }
496 if err != nil {
497 return nil, err
498 }
499 if failover.opt.Dialer != nil {
500 return failover.opt.Dialer(ctx, network, addr)
501 }
502
503 netDialer := &net.Dialer{
504 Timeout: failover.opt.DialTimeout,
505 KeepAlive: 5 * time.Minute,
506 }
507 if failover.opt.TLSConfig == nil {
508 return netDialer.DialContext(ctx, network, addr)
509 }
510 return tls.DialWithDialer(netDialer, network, addr, failover.opt.TLSConfig)
511 }
512 }
513
514
515
516
517 type SentinelClient struct {
518 *baseClient
519 }
520
521 func NewSentinelClient(opt *Options) *SentinelClient {
522 if opt == nil {
523 panic("redis: NewSentinelClient nil options")
524 }
525 opt.init()
526 c := &SentinelClient{
527 baseClient: &baseClient{
528 opt: opt,
529 },
530 }
531
532 c.initHooks(hooks{
533 dial: c.baseClient.dial,
534 process: c.baseClient.process,
535 })
536 c.connPool = newConnPool(opt, c.dialHook)
537
538 return c
539 }
540
541 func (c *SentinelClient) Process(ctx context.Context, cmd Cmder) error {
542 err := c.processHook(ctx, cmd)
543 cmd.SetErr(err)
544 return err
545 }
546
547 func (c *SentinelClient) pubSub() *PubSub {
548 pubsub := &PubSub{
549 opt: c.opt,
550
551 newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
552 return c.newConn(ctx)
553 },
554 closeConn: c.connPool.CloseConn,
555 }
556 pubsub.init()
557 return pubsub
558 }
559
560
561
562 func (c *SentinelClient) Ping(ctx context.Context) *StringCmd {
563 cmd := NewStringCmd(ctx, "ping")
564 _ = c.Process(ctx, cmd)
565 return cmd
566 }
567
568
569
570 func (c *SentinelClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
571 pubsub := c.pubSub()
572 if len(channels) > 0 {
573 _ = pubsub.Subscribe(ctx, channels...)
574 }
575 return pubsub
576 }
577
578
579
580 func (c *SentinelClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
581 pubsub := c.pubSub()
582 if len(channels) > 0 {
583 _ = pubsub.PSubscribe(ctx, channels...)
584 }
585 return pubsub
586 }
587
588 func (c *SentinelClient) GetMasterAddrByName(ctx context.Context, name string) *StringSliceCmd {
589 cmd := NewStringSliceCmd(ctx, "sentinel", "get-master-addr-by-name", name)
590 _ = c.Process(ctx, cmd)
591 return cmd
592 }
593
594 func (c *SentinelClient) Sentinels(ctx context.Context, name string) *MapStringStringSliceCmd {
595 cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "sentinels", name)
596 _ = c.Process(ctx, cmd)
597 return cmd
598 }
599
600
601
602 func (c *SentinelClient) Failover(ctx context.Context, name string) *StatusCmd {
603 cmd := NewStatusCmd(ctx, "sentinel", "failover", name)
604 _ = c.Process(ctx, cmd)
605 return cmd
606 }
607
608
609
610
611
612 func (c *SentinelClient) Reset(ctx context.Context, pattern string) *IntCmd {
613 cmd := NewIntCmd(ctx, "sentinel", "reset", pattern)
614 _ = c.Process(ctx, cmd)
615 return cmd
616 }
617
618
619
620 func (c *SentinelClient) FlushConfig(ctx context.Context) *StatusCmd {
621 cmd := NewStatusCmd(ctx, "sentinel", "flushconfig")
622 _ = c.Process(ctx, cmd)
623 return cmd
624 }
625
626
627 func (c *SentinelClient) Master(ctx context.Context, name string) *MapStringStringCmd {
628 cmd := NewMapStringStringCmd(ctx, "sentinel", "master", name)
629 _ = c.Process(ctx, cmd)
630 return cmd
631 }
632
633
634 func (c *SentinelClient) Masters(ctx context.Context) *SliceCmd {
635 cmd := NewSliceCmd(ctx, "sentinel", "masters")
636 _ = c.Process(ctx, cmd)
637 return cmd
638 }
639
640
641 func (c *SentinelClient) Replicas(ctx context.Context, name string) *MapStringStringSliceCmd {
642 cmd := NewMapStringStringSliceCmd(ctx, "sentinel", "replicas", name)
643 _ = c.Process(ctx, cmd)
644 return cmd
645 }
646
647
648
649
650
651 func (c *SentinelClient) CkQuorum(ctx context.Context, name string) *StringCmd {
652 cmd := NewStringCmd(ctx, "sentinel", "ckquorum", name)
653 _ = c.Process(ctx, cmd)
654 return cmd
655 }
656
657
658
659 func (c *SentinelClient) Monitor(ctx context.Context, name, ip, port, quorum string) *StringCmd {
660 cmd := NewStringCmd(ctx, "sentinel", "monitor", name, ip, port, quorum)
661 _ = c.Process(ctx, cmd)
662 return cmd
663 }
664
665
666 func (c *SentinelClient) Set(ctx context.Context, name, option, value string) *StringCmd {
667 cmd := NewStringCmd(ctx, "sentinel", "set", name, option, value)
668 _ = c.Process(ctx, cmd)
669 return cmd
670 }
671
672
673
674
675 func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
676 cmd := NewStringCmd(ctx, "sentinel", "remove", name)
677 _ = c.Process(ctx, cmd)
678 return cmd
679 }
680
681
682
683 type sentinelFailover struct {
684 opt *FailoverOptions
685
686 sentinelAddrs []string
687
688 onFailover func(ctx context.Context, addr string)
689 onUpdate func(ctx context.Context)
690
691 mu sync.RWMutex
692 masterAddr string
693 sentinel *SentinelClient
694 pubsub *PubSub
695 }
696
697 func (c *sentinelFailover) Close() error {
698 c.mu.Lock()
699 defer c.mu.Unlock()
700 if c.sentinel != nil {
701 return c.closeSentinel()
702 }
703 return nil
704 }
705
706 func (c *sentinelFailover) closeSentinel() error {
707 firstErr := c.pubsub.Close()
708 c.pubsub = nil
709
710 err := c.sentinel.Close()
711 if err != nil && firstErr == nil {
712 firstErr = err
713 }
714 c.sentinel = nil
715
716 return firstErr
717 }
718
719 func (c *sentinelFailover) RandomReplicaAddr(ctx context.Context) (string, error) {
720 if c.opt == nil {
721 return "", errors.New("opt is nil")
722 }
723
724 addresses, err := c.replicaAddrs(ctx, false)
725 if err != nil {
726 return "", err
727 }
728
729 if len(addresses) == 0 && c.opt.UseDisconnectedReplicas {
730 addresses, err = c.replicaAddrs(ctx, true)
731 if err != nil {
732 return "", err
733 }
734 }
735
736 if len(addresses) == 0 {
737 return c.MasterAddr(ctx)
738 }
739 return addresses[rand.Intn(len(addresses))], nil
740 }
741
742 func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
743 c.mu.RLock()
744 sentinel := c.sentinel
745 c.mu.RUnlock()
746
747 if sentinel != nil {
748 addr, err := c.getMasterAddr(ctx, sentinel)
749 if err != nil {
750 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
751 return "", err
752 }
753
754 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
755 c.opt.MasterName, err)
756 } else {
757 return addr, nil
758 }
759 }
760
761 c.mu.Lock()
762 defer c.mu.Unlock()
763
764 if c.sentinel != nil {
765 addr, err := c.getMasterAddr(ctx, c.sentinel)
766 if err != nil {
767 _ = c.closeSentinel()
768 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
769 return "", err
770 }
771
772 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName name=%q failed: %s",
773 c.opt.MasterName, err)
774 } else {
775 return addr, nil
776 }
777 }
778
779 var (
780 masterAddr string
781 wg sync.WaitGroup
782 once sync.Once
783 errCh = make(chan error, len(c.sentinelAddrs))
784 )
785
786 ctx, cancel := context.WithCancel(ctx)
787 defer cancel()
788
789 for i, sentinelAddr := range c.sentinelAddrs {
790 wg.Add(1)
791 go func(i int, addr string) {
792 defer wg.Done()
793 sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
794 addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
795 if err != nil {
796 internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
797 addr, c.opt.MasterName, err)
798 _ = sentinelCli.Close()
799 errCh <- err
800 return
801 }
802 once.Do(func() {
803 masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
804
805 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
806 c.setSentinel(ctx, sentinelCli)
807 internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
808 cancel()
809 })
810 }(i, sentinelAddr)
811 }
812
813 wg.Wait()
814 close(errCh)
815 if masterAddr != "" {
816 return masterAddr, nil
817 }
818 errs := make([]error, 0, len(errCh))
819 for err := range errCh {
820 errs = append(errs, err)
821 }
822 return "", fmt.Errorf("redis: all sentinels specified in configuration are unreachable: %s", joinErrors(errs))
823 }
824
825 func joinErrors(errs []error) string {
826 if len(errs) == 1 {
827 return errs[0].Error()
828 }
829
830 b := []byte(errs[0].Error())
831 for _, err := range errs[1:] {
832 b = append(b, '\n')
833 b = append(b, err.Error()...)
834 }
835 return util.BytesToString(b)
836 }
837
838 func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
839 c.mu.RLock()
840 sentinel := c.sentinel
841 c.mu.RUnlock()
842
843 if sentinel != nil {
844 addrs, err := c.getReplicaAddrs(ctx, sentinel)
845 if err != nil {
846 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
847 return nil, err
848 }
849
850 internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
851 c.opt.MasterName, err)
852 } else if len(addrs) > 0 {
853 return addrs, nil
854 }
855 }
856
857 c.mu.Lock()
858 defer c.mu.Unlock()
859
860 if c.sentinel != nil {
861 addrs, err := c.getReplicaAddrs(ctx, c.sentinel)
862 if err != nil {
863 _ = c.closeSentinel()
864 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
865 return nil, err
866 }
867
868 internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
869 c.opt.MasterName, err)
870 } else if len(addrs) > 0 {
871 return addrs, nil
872 } else {
873
874 _ = c.closeSentinel()
875 }
876 }
877
878 var sentinelReachable bool
879
880 for i, sentinelAddr := range c.sentinelAddrs {
881 sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
882
883 replicas, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
884 if err != nil {
885 _ = sentinel.Close()
886 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
887 return nil, err
888 }
889 internal.Logger.Printf(ctx, "sentinel: Replicas master=%q failed: %s",
890 c.opt.MasterName, err)
891 continue
892 }
893 sentinelReachable = true
894 addrs := parseReplicaAddrs(replicas, useDisconnected)
895 if len(addrs) == 0 {
896 continue
897 }
898
899 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
900 c.setSentinel(ctx, sentinel)
901
902 return addrs, nil
903 }
904
905 if sentinelReachable {
906 return []string{}, nil
907 }
908 return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
909 }
910
911 func (c *sentinelFailover) getMasterAddr(ctx context.Context, sentinel *SentinelClient) (string, error) {
912 addr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
913 if err != nil {
914 return "", err
915 }
916 return net.JoinHostPort(addr[0], addr[1]), nil
917 }
918
919 func (c *sentinelFailover) getReplicaAddrs(ctx context.Context, sentinel *SentinelClient) ([]string, error) {
920 addrs, err := sentinel.Replicas(ctx, c.opt.MasterName).Result()
921 if err != nil {
922 internal.Logger.Printf(ctx, "sentinel: Replicas name=%q failed: %s",
923 c.opt.MasterName, err)
924 return nil, err
925 }
926 return parseReplicaAddrs(addrs, false), nil
927 }
928
929 func parseReplicaAddrs(addrs []map[string]string, keepDisconnected bool) []string {
930 nodes := make([]string, 0, len(addrs))
931 for _, node := range addrs {
932 isDown := false
933 if flags, ok := node["flags"]; ok {
934 for _, flag := range strings.Split(flags, ",") {
935 switch flag {
936 case "s_down", "o_down":
937 isDown = true
938 case "disconnected":
939 if !keepDisconnected {
940 isDown = true
941 }
942 }
943 }
944 }
945 if !isDown && node["ip"] != "" && node["port"] != "" {
946 nodes = append(nodes, net.JoinHostPort(node["ip"], node["port"]))
947 }
948 }
949
950 return nodes
951 }
952
953 func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
954 c.mu.RLock()
955 currentAddr := c.masterAddr
956 c.mu.RUnlock()
957
958 if addr == currentAddr {
959 return
960 }
961
962 c.mu.Lock()
963 defer c.mu.Unlock()
964
965 if addr == c.masterAddr {
966 return
967 }
968 c.masterAddr = addr
969
970 internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
971 c.opt.MasterName, addr)
972 if c.onFailover != nil {
973 c.onFailover(ctx, addr)
974 }
975 }
976
977 func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
978 if c.sentinel != nil {
979 panic("not reached")
980 }
981 c.sentinel = sentinel
982 c.discoverSentinels(ctx)
983
984 c.pubsub = sentinel.Subscribe(ctx, "+switch-master", "+replica-reconf-done")
985 go c.listen(c.pubsub)
986 }
987
988 func (c *sentinelFailover) discoverSentinels(ctx context.Context) {
989 sentinels, err := c.sentinel.Sentinels(ctx, c.opt.MasterName).Result()
990 if err != nil {
991 internal.Logger.Printf(ctx, "sentinel: Sentinels master=%q failed: %s", c.opt.MasterName, err)
992 return
993 }
994 for _, sentinel := range sentinels {
995 ip, ok := sentinel["ip"]
996 if !ok {
997 continue
998 }
999 port, ok := sentinel["port"]
1000 if !ok {
1001 continue
1002 }
1003 if ip != "" && port != "" {
1004 sentinelAddr := net.JoinHostPort(ip, port)
1005 if !contains(c.sentinelAddrs, sentinelAddr) {
1006 internal.Logger.Printf(ctx, "sentinel: discovered new sentinel=%q for master=%q",
1007 sentinelAddr, c.opt.MasterName)
1008 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
1009 }
1010 }
1011 }
1012 }
1013
1014 func (c *sentinelFailover) listen(pubsub *PubSub) {
1015 ctx := context.TODO()
1016
1017 if c.onUpdate != nil {
1018 c.onUpdate(ctx)
1019 }
1020
1021 ch := pubsub.Channel()
1022 for msg := range ch {
1023 if msg.Channel == "+switch-master" {
1024 parts := strings.Split(msg.Payload, " ")
1025 if parts[0] != c.opt.MasterName {
1026 internal.Logger.Printf(pubsub.getContext(), "sentinel: ignore addr for master=%q", parts[0])
1027 continue
1028 }
1029 addr := net.JoinHostPort(parts[3], parts[4])
1030 c.trySwitchMaster(pubsub.getContext(), addr)
1031 }
1032
1033 if c.onUpdate != nil {
1034 c.onUpdate(ctx)
1035 }
1036 }
1037 }
1038
1039 func contains(slice []string, str string) bool {
1040 for _, s := range slice {
1041 if s == str {
1042 return true
1043 }
1044 }
1045 return false
1046 }
1047
1048
1049
1050
1051
1052 func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
1053 if failoverOpt == nil {
1054 panic("redis: NewFailoverClusterClient nil options")
1055 }
1056
1057 sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))
1058 copy(sentinelAddrs, failoverOpt.SentinelAddrs)
1059
1060 failover := &sentinelFailover{
1061 opt: failoverOpt,
1062 sentinelAddrs: sentinelAddrs,
1063 }
1064
1065 opt := failoverOpt.clusterOptions()
1066 if failoverOpt.DB != 0 {
1067 onConnect := opt.OnConnect
1068
1069 opt.OnConnect = func(ctx context.Context, cn *Conn) error {
1070 if err := cn.Select(ctx, failoverOpt.DB).Err(); err != nil {
1071 return err
1072 }
1073
1074 if onConnect != nil {
1075 return onConnect(ctx, cn)
1076 }
1077
1078 return nil
1079 }
1080 }
1081
1082 opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
1083 masterAddr, err := failover.MasterAddr(ctx)
1084 if err != nil {
1085 return nil, err
1086 }
1087
1088 nodes := []ClusterNode{{
1089 Addr: masterAddr,
1090 }}
1091
1092 replicaAddrs, err := failover.replicaAddrs(ctx, false)
1093 if err != nil {
1094 return nil, err
1095 }
1096
1097 for _, replicaAddr := range replicaAddrs {
1098 nodes = append(nodes, ClusterNode{
1099 Addr: replicaAddr,
1100 })
1101 }
1102
1103 slots := []ClusterSlot{
1104 {
1105 Start: 0,
1106 End: 16383,
1107 Nodes: nodes,
1108 },
1109 }
1110 return slots, nil
1111 }
1112
1113 c := NewClusterClient(opt)
1114
1115 failover.mu.Lock()
1116 failover.onUpdate = func(ctx context.Context) {
1117 c.ReloadState(ctx)
1118 }
1119 failover.mu.Unlock()
1120
1121 return c
1122 }
1123
View as plain text