...

Source file src/github.com/redis/go-redis/v9/pubsub_test.go

Documentation: github.com/redis/go-redis/v9

     1  package redis_test
     2  
     3  import (
     4  	"io"
     5  	"net"
     6  	"sync"
     7  	"time"
     8  
     9  	. "github.com/bsm/ginkgo/v2"
    10  	. "github.com/bsm/gomega"
    11  
    12  	"github.com/redis/go-redis/v9"
    13  )
    14  
    15  var _ = Describe("PubSub", func() {
    16  	var client *redis.Client
    17  
    18  	BeforeEach(func() {
    19  		opt := redisOptions()
    20  		opt.MinIdleConns = 0
    21  		opt.ConnMaxLifetime = 0
    22  		client = redis.NewClient(opt)
    23  		Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
    24  	})
    25  
    26  	AfterEach(func() {
    27  		Expect(client.Close()).NotTo(HaveOccurred())
    28  	})
    29  
    30  	It("implements Stringer", func() {
    31  		pubsub := client.PSubscribe(ctx, "mychannel*")
    32  		defer pubsub.Close()
    33  
    34  		Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
    35  	})
    36  
    37  	It("should support pattern matching", func() {
    38  		pubsub := client.PSubscribe(ctx, "mychannel*")
    39  		defer pubsub.Close()
    40  
    41  		{
    42  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
    43  			Expect(err).NotTo(HaveOccurred())
    44  			subscr := msgi.(*redis.Subscription)
    45  			Expect(subscr.Kind).To(Equal("psubscribe"))
    46  			Expect(subscr.Channel).To(Equal("mychannel*"))
    47  			Expect(subscr.Count).To(Equal(1))
    48  		}
    49  
    50  		{
    51  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
    52  			Expect(err.(net.Error).Timeout()).To(Equal(true))
    53  			Expect(msgi).To(BeNil())
    54  		}
    55  
    56  		n, err := client.Publish(ctx, "mychannel1", "hello").Result()
    57  		Expect(err).NotTo(HaveOccurred())
    58  		Expect(n).To(Equal(int64(1)))
    59  
    60  		Expect(pubsub.PUnsubscribe(ctx, "mychannel*")).NotTo(HaveOccurred())
    61  
    62  		{
    63  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
    64  			Expect(err).NotTo(HaveOccurred())
    65  			subscr := msgi.(*redis.Message)
    66  			Expect(subscr.Channel).To(Equal("mychannel1"))
    67  			Expect(subscr.Pattern).To(Equal("mychannel*"))
    68  			Expect(subscr.Payload).To(Equal("hello"))
    69  		}
    70  
    71  		{
    72  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
    73  			Expect(err).NotTo(HaveOccurred())
    74  			subscr := msgi.(*redis.Subscription)
    75  			Expect(subscr.Kind).To(Equal("punsubscribe"))
    76  			Expect(subscr.Channel).To(Equal("mychannel*"))
    77  			Expect(subscr.Count).To(Equal(0))
    78  		}
    79  
    80  		stats := client.PoolStats()
    81  		Expect(stats.Misses).To(Equal(uint32(1)))
    82  	})
    83  
    84  	It("should pub/sub channels", func() {
    85  		channels, err := client.PubSubChannels(ctx, "mychannel*").Result()
    86  		Expect(err).NotTo(HaveOccurred())
    87  		Expect(channels).To(BeEmpty())
    88  
    89  		pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
    90  		defer pubsub.Close()
    91  
    92  		// sleep a bit to make sure redis knows about the subscriptions
    93  		time.Sleep(10 * time.Millisecond)
    94  
    95  		channels, err = client.PubSubChannels(ctx, "mychannel*").Result()
    96  		Expect(err).NotTo(HaveOccurred())
    97  		Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
    98  
    99  		channels, err = client.PubSubChannels(ctx, "").Result()
   100  		Expect(err).NotTo(HaveOccurred())
   101  		Expect(channels).To(BeEmpty())
   102  
   103  		channels, err = client.PubSubChannels(ctx, "*").Result()
   104  		Expect(err).NotTo(HaveOccurred())
   105  		Expect(len(channels)).To(BeNumerically(">=", 2))
   106  	})
   107  
   108  	It("should sharded pub/sub channels", func() {
   109  		channels, err := client.PubSubShardChannels(ctx, "mychannel*").Result()
   110  		Expect(err).NotTo(HaveOccurred())
   111  		Expect(channels).To(BeEmpty())
   112  
   113  		pubsub := client.SSubscribe(ctx, "mychannel", "mychannel2")
   114  		defer pubsub.Close()
   115  
   116  		// Let Redis process the ssubscribe command.
   117  		time.Sleep(10 * time.Millisecond)
   118  
   119  		channels, err = client.PubSubShardChannels(ctx, "mychannel*").Result()
   120  		Expect(err).NotTo(HaveOccurred())
   121  		Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
   122  
   123  		channels, err = client.PubSubShardChannels(ctx, "").Result()
   124  		Expect(err).NotTo(HaveOccurred())
   125  		Expect(channels).To(BeEmpty())
   126  
   127  		channels, err = client.PubSubShardChannels(ctx, "*").Result()
   128  		Expect(err).NotTo(HaveOccurred())
   129  		Expect(len(channels)).To(BeNumerically(">=", 2))
   130  
   131  		nums, err := client.PubSubShardNumSub(ctx, "mychannel", "mychannel2", "mychannel3").Result()
   132  		Expect(err).NotTo(HaveOccurred())
   133  		Expect(nums).To(Equal(map[string]int64{
   134  			"mychannel":  1,
   135  			"mychannel2": 1,
   136  			"mychannel3": 0,
   137  		}))
   138  	})
   139  
   140  	It("should return the numbers of subscribers", func() {
   141  		pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
   142  		defer pubsub.Close()
   143  
   144  		// sleep a bit to make sure redis knows about the subscriptions
   145  		time.Sleep(10 * time.Millisecond)
   146  		channels, err := client.PubSubNumSub(ctx, "mychannel", "mychannel2", "mychannel3").Result()
   147  		Expect(err).NotTo(HaveOccurred())
   148  		Expect(channels).To(Equal(map[string]int64{
   149  			"mychannel":  1,
   150  			"mychannel2": 1,
   151  			"mychannel3": 0,
   152  		}))
   153  	})
   154  
   155  	It("should return the numbers of subscribers by pattern", func() {
   156  		num, err := client.PubSubNumPat(ctx).Result()
   157  		Expect(err).NotTo(HaveOccurred())
   158  		Expect(num).To(Equal(int64(0)))
   159  
   160  		pubsub := client.PSubscribe(ctx, "*")
   161  		defer pubsub.Close()
   162  
   163  		// sleep a bit to make sure redis knows about the subscriptions
   164  		time.Sleep(10 * time.Millisecond)
   165  		num, err = client.PubSubNumPat(ctx).Result()
   166  		Expect(err).NotTo(HaveOccurred())
   167  		Expect(num).To(Equal(int64(1)))
   168  	})
   169  
   170  	It("should pub/sub", func() {
   171  		pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
   172  		defer pubsub.Close()
   173  
   174  		{
   175  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   176  			Expect(err).NotTo(HaveOccurred())
   177  			subscr := msgi.(*redis.Subscription)
   178  			Expect(subscr.Kind).To(Equal("subscribe"))
   179  			Expect(subscr.Channel).To(Equal("mychannel"))
   180  			Expect(subscr.Count).To(Equal(1))
   181  		}
   182  
   183  		{
   184  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   185  			Expect(err).NotTo(HaveOccurred())
   186  			subscr := msgi.(*redis.Subscription)
   187  			Expect(subscr.Kind).To(Equal("subscribe"))
   188  			Expect(subscr.Channel).To(Equal("mychannel2"))
   189  			Expect(subscr.Count).To(Equal(2))
   190  		}
   191  
   192  		{
   193  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   194  			Expect(err.(net.Error).Timeout()).To(Equal(true))
   195  			Expect(msgi).NotTo(HaveOccurred())
   196  		}
   197  
   198  		n, err := client.Publish(ctx, "mychannel", "hello").Result()
   199  		Expect(err).NotTo(HaveOccurred())
   200  		Expect(n).To(Equal(int64(1)))
   201  
   202  		n, err = client.Publish(ctx, "mychannel2", "hello2").Result()
   203  		Expect(err).NotTo(HaveOccurred())
   204  		Expect(n).To(Equal(int64(1)))
   205  
   206  		Expect(pubsub.Unsubscribe(ctx, "mychannel", "mychannel2")).NotTo(HaveOccurred())
   207  
   208  		{
   209  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   210  			Expect(err).NotTo(HaveOccurred())
   211  			msg := msgi.(*redis.Message)
   212  			Expect(msg.Channel).To(Equal("mychannel"))
   213  			Expect(msg.Payload).To(Equal("hello"))
   214  		}
   215  
   216  		{
   217  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   218  			Expect(err).NotTo(HaveOccurred())
   219  			msg := msgi.(*redis.Message)
   220  			Expect(msg.Channel).To(Equal("mychannel2"))
   221  			Expect(msg.Payload).To(Equal("hello2"))
   222  		}
   223  
   224  		{
   225  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   226  			Expect(err).NotTo(HaveOccurred())
   227  			subscr := msgi.(*redis.Subscription)
   228  			Expect(subscr.Kind).To(Equal("unsubscribe"))
   229  			Expect(subscr.Channel).To(Equal("mychannel"))
   230  			Expect(subscr.Count).To(Equal(1))
   231  		}
   232  
   233  		{
   234  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   235  			Expect(err).NotTo(HaveOccurred())
   236  			subscr := msgi.(*redis.Subscription)
   237  			Expect(subscr.Kind).To(Equal("unsubscribe"))
   238  			Expect(subscr.Channel).To(Equal("mychannel2"))
   239  			Expect(subscr.Count).To(Equal(0))
   240  		}
   241  
   242  		stats := client.PoolStats()
   243  		Expect(stats.Misses).To(Equal(uint32(1)))
   244  	})
   245  
   246  	It("should sharded pub/sub", func() {
   247  		pubsub := client.SSubscribe(ctx, "mychannel", "mychannel2")
   248  		defer pubsub.Close()
   249  
   250  		{
   251  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   252  			Expect(err).NotTo(HaveOccurred())
   253  			subscr := msgi.(*redis.Subscription)
   254  			Expect(subscr.Kind).To(Equal("ssubscribe"))
   255  			Expect(subscr.Channel).To(Equal("mychannel"))
   256  			Expect(subscr.Count).To(Equal(1))
   257  		}
   258  
   259  		{
   260  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   261  			Expect(err).NotTo(HaveOccurred())
   262  			subscr := msgi.(*redis.Subscription)
   263  			Expect(subscr.Kind).To(Equal("ssubscribe"))
   264  			Expect(subscr.Channel).To(Equal("mychannel2"))
   265  			Expect(subscr.Count).To(Equal(2))
   266  		}
   267  
   268  		{
   269  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   270  			Expect(err.(net.Error).Timeout()).To(Equal(true))
   271  			Expect(msgi).NotTo(HaveOccurred())
   272  		}
   273  
   274  		n, err := client.SPublish(ctx, "mychannel", "hello").Result()
   275  		Expect(err).NotTo(HaveOccurred())
   276  		Expect(n).To(Equal(int64(1)))
   277  
   278  		n, err = client.SPublish(ctx, "mychannel2", "hello2").Result()
   279  		Expect(err).NotTo(HaveOccurred())
   280  		Expect(n).To(Equal(int64(1)))
   281  
   282  		Expect(pubsub.SUnsubscribe(ctx, "mychannel", "mychannel2")).NotTo(HaveOccurred())
   283  
   284  		{
   285  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   286  			Expect(err).NotTo(HaveOccurred())
   287  			msg := msgi.(*redis.Message)
   288  			Expect(msg.Channel).To(Equal("mychannel"))
   289  			Expect(msg.Payload).To(Equal("hello"))
   290  		}
   291  
   292  		{
   293  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   294  			Expect(err).NotTo(HaveOccurred())
   295  			msg := msgi.(*redis.Message)
   296  			Expect(msg.Channel).To(Equal("mychannel2"))
   297  			Expect(msg.Payload).To(Equal("hello2"))
   298  		}
   299  
   300  		{
   301  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   302  			Expect(err).NotTo(HaveOccurred())
   303  			subscr := msgi.(*redis.Subscription)
   304  			Expect(subscr.Kind).To(Equal("sunsubscribe"))
   305  			Expect(subscr.Channel).To(Equal("mychannel"))
   306  			Expect(subscr.Count).To(Equal(1))
   307  		}
   308  
   309  		{
   310  			msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   311  			Expect(err).NotTo(HaveOccurred())
   312  			subscr := msgi.(*redis.Subscription)
   313  			Expect(subscr.Kind).To(Equal("sunsubscribe"))
   314  			Expect(subscr.Channel).To(Equal("mychannel2"))
   315  			Expect(subscr.Count).To(Equal(0))
   316  		}
   317  
   318  		stats := client.PoolStats()
   319  		Expect(stats.Misses).To(Equal(uint32(1)))
   320  	})
   321  
   322  	It("should ping/pong", func() {
   323  		pubsub := client.Subscribe(ctx, "mychannel")
   324  		defer pubsub.Close()
   325  
   326  		_, err := pubsub.ReceiveTimeout(ctx, time.Second)
   327  		Expect(err).NotTo(HaveOccurred())
   328  
   329  		err = pubsub.Ping(ctx, "")
   330  		Expect(err).NotTo(HaveOccurred())
   331  
   332  		msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   333  		Expect(err).NotTo(HaveOccurred())
   334  		pong := msgi.(*redis.Pong)
   335  		Expect(pong.Payload).To(Equal(""))
   336  	})
   337  
   338  	It("should ping/pong with payload", func() {
   339  		pubsub := client.Subscribe(ctx, "mychannel")
   340  		defer pubsub.Close()
   341  
   342  		_, err := pubsub.ReceiveTimeout(ctx, time.Second)
   343  		Expect(err).NotTo(HaveOccurred())
   344  
   345  		err = pubsub.Ping(ctx, "hello")
   346  		Expect(err).NotTo(HaveOccurred())
   347  
   348  		msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
   349  		Expect(err).NotTo(HaveOccurred())
   350  		pong := msgi.(*redis.Pong)
   351  		Expect(pong.Payload).To(Equal("hello"))
   352  	})
   353  
   354  	It("should multi-ReceiveMessage", func() {
   355  		pubsub := client.Subscribe(ctx, "mychannel")
   356  		defer pubsub.Close()
   357  
   358  		subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
   359  		Expect(err).NotTo(HaveOccurred())
   360  		Expect(subscr).To(Equal(&redis.Subscription{
   361  			Kind:    "subscribe",
   362  			Channel: "mychannel",
   363  			Count:   1,
   364  		}))
   365  
   366  		err = client.Publish(ctx, "mychannel", "hello").Err()
   367  		Expect(err).NotTo(HaveOccurred())
   368  
   369  		err = client.Publish(ctx, "mychannel", "world").Err()
   370  		Expect(err).NotTo(HaveOccurred())
   371  
   372  		msg, err := pubsub.ReceiveMessage(ctx)
   373  		Expect(err).NotTo(HaveOccurred())
   374  		Expect(msg.Channel).To(Equal("mychannel"))
   375  		Expect(msg.Payload).To(Equal("hello"))
   376  
   377  		msg, err = pubsub.ReceiveMessage(ctx)
   378  		Expect(err).NotTo(HaveOccurred())
   379  		Expect(msg.Channel).To(Equal("mychannel"))
   380  		Expect(msg.Payload).To(Equal("world"))
   381  	})
   382  
   383  	It("returns an error when subscribe fails", func() {
   384  		pubsub := client.Subscribe(ctx)
   385  		defer pubsub.Close()
   386  
   387  		pubsub.SetNetConn(&badConn{
   388  			readErr:  io.EOF,
   389  			writeErr: io.EOF,
   390  		})
   391  
   392  		err := pubsub.Subscribe(ctx, "mychannel")
   393  		Expect(err).To(MatchError("EOF"))
   394  
   395  		err = pubsub.Subscribe(ctx, "mychannel")
   396  		Expect(err).NotTo(HaveOccurred())
   397  	})
   398  
   399  	expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
   400  		pubsub.SetNetConn(&badConn{
   401  			readErr:  io.EOF,
   402  			writeErr: io.EOF,
   403  		})
   404  
   405  		step := make(chan struct{}, 3)
   406  
   407  		go func() {
   408  			defer GinkgoRecover()
   409  
   410  			Eventually(step).Should(Receive())
   411  			err := client.Publish(ctx, "mychannel", "hello").Err()
   412  			Expect(err).NotTo(HaveOccurred())
   413  			step <- struct{}{}
   414  		}()
   415  
   416  		_, err := pubsub.ReceiveMessage(ctx)
   417  		Expect(err).To(Equal(io.EOF))
   418  		step <- struct{}{}
   419  
   420  		msg, err := pubsub.ReceiveMessage(ctx)
   421  		Expect(err).NotTo(HaveOccurred())
   422  		Expect(msg.Channel).To(Equal("mychannel"))
   423  		Expect(msg.Payload).To(Equal("hello"))
   424  
   425  		Eventually(step).Should(Receive())
   426  	}
   427  
   428  	It("Subscribe should reconnect on ReceiveMessage error", func() {
   429  		pubsub := client.Subscribe(ctx, "mychannel")
   430  		defer pubsub.Close()
   431  
   432  		subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
   433  		Expect(err).NotTo(HaveOccurred())
   434  		Expect(subscr).To(Equal(&redis.Subscription{
   435  			Kind:    "subscribe",
   436  			Channel: "mychannel",
   437  			Count:   1,
   438  		}))
   439  
   440  		expectReceiveMessageOnError(pubsub)
   441  	})
   442  
   443  	It("PSubscribe should reconnect on ReceiveMessage error", func() {
   444  		pubsub := client.PSubscribe(ctx, "mychannel")
   445  		defer pubsub.Close()
   446  
   447  		subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
   448  		Expect(err).NotTo(HaveOccurred())
   449  		Expect(subscr).To(Equal(&redis.Subscription{
   450  			Kind:    "psubscribe",
   451  			Channel: "mychannel",
   452  			Count:   1,
   453  		}))
   454  
   455  		expectReceiveMessageOnError(pubsub)
   456  	})
   457  
   458  	It("should return on Close", func() {
   459  		pubsub := client.Subscribe(ctx, "mychannel")
   460  		defer pubsub.Close()
   461  
   462  		var wg sync.WaitGroup
   463  		wg.Add(1)
   464  		go func() {
   465  			defer GinkgoRecover()
   466  
   467  			wg.Done()
   468  			defer wg.Done()
   469  
   470  			_, err := pubsub.ReceiveMessage(ctx)
   471  			Expect(err).To(HaveOccurred())
   472  			Expect(err.Error()).To(SatisfyAny(
   473  				Equal("redis: client is closed"),
   474  				ContainSubstring("use of closed network connection"),
   475  			))
   476  		}()
   477  
   478  		wg.Wait()
   479  		wg.Add(1)
   480  
   481  		Expect(pubsub.Close()).NotTo(HaveOccurred())
   482  
   483  		wg.Wait()
   484  	})
   485  
   486  	It("should ReceiveMessage without a subscription", func() {
   487  		timeout := 100 * time.Millisecond
   488  
   489  		pubsub := client.Subscribe(ctx)
   490  		defer pubsub.Close()
   491  
   492  		var wg sync.WaitGroup
   493  		wg.Add(1)
   494  		go func() {
   495  			defer GinkgoRecover()
   496  			defer wg.Done()
   497  
   498  			time.Sleep(timeout)
   499  
   500  			err := pubsub.Subscribe(ctx, "mychannel")
   501  			Expect(err).NotTo(HaveOccurred())
   502  
   503  			time.Sleep(timeout)
   504  
   505  			err = client.Publish(ctx, "mychannel", "hello").Err()
   506  			Expect(err).NotTo(HaveOccurred())
   507  		}()
   508  
   509  		msg, err := pubsub.ReceiveMessage(ctx)
   510  		Expect(err).NotTo(HaveOccurred())
   511  		Expect(msg.Channel).To(Equal("mychannel"))
   512  		Expect(msg.Payload).To(Equal("hello"))
   513  
   514  		wg.Wait()
   515  	})
   516  
   517  	It("handles big message payload", func() {
   518  		pubsub := client.Subscribe(ctx, "mychannel")
   519  		defer pubsub.Close()
   520  
   521  		ch := pubsub.Channel()
   522  
   523  		bigVal := bigVal()
   524  		err := client.Publish(ctx, "mychannel", bigVal).Err()
   525  		Expect(err).NotTo(HaveOccurred())
   526  
   527  		var msg *redis.Message
   528  		Eventually(ch).Should(Receive(&msg))
   529  		Expect(msg.Channel).To(Equal("mychannel"))
   530  		Expect(msg.Payload).To(Equal(string(bigVal)))
   531  	})
   532  
   533  	It("supports concurrent Ping and Receive", func() {
   534  		const N = 100
   535  
   536  		pubsub := client.Subscribe(ctx, "mychannel")
   537  		defer pubsub.Close()
   538  
   539  		done := make(chan struct{})
   540  		go func() {
   541  			defer GinkgoRecover()
   542  
   543  			for i := 0; i < N; i++ {
   544  				_, err := pubsub.ReceiveTimeout(ctx, 5*time.Second)
   545  				Expect(err).NotTo(HaveOccurred())
   546  			}
   547  			close(done)
   548  		}()
   549  
   550  		for i := 0; i < N; i++ {
   551  			err := pubsub.Ping(ctx)
   552  			Expect(err).NotTo(HaveOccurred())
   553  		}
   554  
   555  		select {
   556  		case <-done:
   557  		case <-time.After(30 * time.Second):
   558  			Fail("timeout")
   559  		}
   560  	})
   561  
   562  	It("should ChannelMessage", func() {
   563  		pubsub := client.Subscribe(ctx, "mychannel")
   564  		defer pubsub.Close()
   565  
   566  		ch := pubsub.Channel(
   567  			redis.WithChannelSize(10),
   568  			redis.WithChannelHealthCheckInterval(time.Second),
   569  		)
   570  
   571  		text := "test channel message"
   572  		err := client.Publish(ctx, "mychannel", text).Err()
   573  		Expect(err).NotTo(HaveOccurred())
   574  
   575  		var msg *redis.Message
   576  		Eventually(ch).Should(Receive(&msg))
   577  		Expect(msg.Channel).To(Equal("mychannel"))
   578  		Expect(msg.Payload).To(Equal(text))
   579  	})
   580  })
   581  

View as plain text