...

Source file src/github.com/redis/go-redis/v9/stream_commands.go

Documentation: github.com/redis/go-redis/v9

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  )
     7  
     8  type StreamCmdable interface {
     9  	XAdd(ctx context.Context, a *XAddArgs) *StringCmd
    10  	XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd
    11  	XDel(ctx context.Context, stream string, ids ...string) *IntCmd
    12  	XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd
    13  	XLen(ctx context.Context, stream string) *IntCmd
    14  	XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
    15  	XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
    16  	XRevRange(ctx context.Context, stream string, start, stop string) *XMessageSliceCmd
    17  	XRevRangeN(ctx context.Context, stream string, start, stop string, count int64) *XMessageSliceCmd
    18  	XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd
    19  	XReadStreams(ctx context.Context, streams ...string) *XStreamSliceCmd
    20  	XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd
    21  	XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
    22  	XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
    23  	XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
    24  	XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
    25  	XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
    26  	XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
    27  	XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
    28  	XPending(ctx context.Context, stream, group string) *XPendingCmd
    29  	XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
    30  	XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
    31  	XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
    32  	XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd
    33  	XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
    34  	XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
    35  	XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
    36  	XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
    37  	XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
    38  	XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
    39  	XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
    40  	XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
    41  	XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
    42  	XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
    43  	XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
    44  	XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
    45  	XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
    46  }
    47  
    48  // XAddArgs accepts values in the following formats:
    49  //   - XAddArgs.Values = []interface{}{"key1", "value1", "key2", "value2"}
    50  //   - XAddArgs.Values = []string("key1", "value1", "key2", "value2")
    51  //   - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"}
    52  //
    53  // Note that map will not preserve the order of key-value pairs.
    54  // MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
    55  type XAddArgs struct {
    56  	Stream     string
    57  	NoMkStream bool
    58  	MaxLen     int64 // MAXLEN N
    59  	MinID      string
    60  	// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
    61  	Approx bool
    62  	Limit  int64
    63  	Mode   string
    64  	ID     string
    65  	Values interface{}
    66  }
    67  
    68  func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
    69  	args := make([]interface{}, 0, 11)
    70  	args = append(args, "xadd", a.Stream)
    71  	if a.NoMkStream {
    72  		args = append(args, "nomkstream")
    73  	}
    74  	switch {
    75  	case a.MaxLen > 0:
    76  		if a.Approx {
    77  			args = append(args, "maxlen", "~", a.MaxLen)
    78  		} else {
    79  			args = append(args, "maxlen", a.MaxLen)
    80  		}
    81  	case a.MinID != "":
    82  		if a.Approx {
    83  			args = append(args, "minid", "~", a.MinID)
    84  		} else {
    85  			args = append(args, "minid", a.MinID)
    86  		}
    87  	}
    88  	if a.Limit > 0 {
    89  		args = append(args, "limit", a.Limit)
    90  	}
    91  
    92  	if a.Mode != "" {
    93  		args = append(args, a.Mode)
    94  	}
    95  
    96  	if a.ID != "" {
    97  		args = append(args, a.ID)
    98  	} else {
    99  		args = append(args, "*")
   100  	}
   101  	args = appendArg(args, a.Values)
   102  
   103  	cmd := NewStringCmd(ctx, args...)
   104  	_ = c(ctx, cmd)
   105  	return cmd
   106  }
   107  
   108  func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd {
   109  	args := []interface{}{"xackdel", stream, group, mode, "ids", len(ids)}
   110  	for _, id := range ids {
   111  		args = append(args, id)
   112  	}
   113  	cmd := NewSliceCmd(ctx, args...)
   114  	_ = c(ctx, cmd)
   115  	return cmd
   116  }
   117  
   118  func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
   119  	args := []interface{}{"xdel", stream}
   120  	for _, id := range ids {
   121  		args = append(args, id)
   122  	}
   123  	cmd := NewIntCmd(ctx, args...)
   124  	_ = c(ctx, cmd)
   125  	return cmd
   126  }
   127  
   128  func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd {
   129  	args := []interface{}{"xdelex", stream, mode, "ids", len(ids)}
   130  	for _, id := range ids {
   131  		args = append(args, id)
   132  	}
   133  	cmd := NewSliceCmd(ctx, args...)
   134  	_ = c(ctx, cmd)
   135  	return cmd
   136  }
   137  
   138  func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
   139  	cmd := NewIntCmd(ctx, "xlen", stream)
   140  	_ = c(ctx, cmd)
   141  	return cmd
   142  }
   143  
   144  func (c cmdable) XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd {
   145  	cmd := NewXMessageSliceCmd(ctx, "xrange", stream, start, stop)
   146  	_ = c(ctx, cmd)
   147  	return cmd
   148  }
   149  
   150  func (c cmdable) XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd {
   151  	cmd := NewXMessageSliceCmd(ctx, "xrange", stream, start, stop, "count", count)
   152  	_ = c(ctx, cmd)
   153  	return cmd
   154  }
   155  
   156  func (c cmdable) XRevRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd {
   157  	cmd := NewXMessageSliceCmd(ctx, "xrevrange", stream, start, stop)
   158  	_ = c(ctx, cmd)
   159  	return cmd
   160  }
   161  
   162  func (c cmdable) XRevRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd {
   163  	cmd := NewXMessageSliceCmd(ctx, "xrevrange", stream, start, stop, "count", count)
   164  	_ = c(ctx, cmd)
   165  	return cmd
   166  }
   167  
   168  type XReadArgs struct {
   169  	Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
   170  	Count   int64
   171  	Block   time.Duration
   172  	ID      string
   173  }
   174  
   175  func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
   176  	args := make([]interface{}, 0, 2*len(a.Streams)+6)
   177  	args = append(args, "xread")
   178  
   179  	keyPos := int8(1)
   180  	if a.Count > 0 {
   181  		args = append(args, "count")
   182  		args = append(args, a.Count)
   183  		keyPos += 2
   184  	}
   185  	if a.Block >= 0 {
   186  		args = append(args, "block")
   187  		args = append(args, int64(a.Block/time.Millisecond))
   188  		keyPos += 2
   189  	}
   190  	args = append(args, "streams")
   191  	keyPos++
   192  	for _, s := range a.Streams {
   193  		args = append(args, s)
   194  	}
   195  	if a.ID != "" {
   196  		for range a.Streams {
   197  			args = append(args, a.ID)
   198  		}
   199  	}
   200  
   201  	cmd := NewXStreamSliceCmd(ctx, args...)
   202  	if a.Block >= 0 {
   203  		cmd.setReadTimeout(a.Block)
   204  	}
   205  	cmd.SetFirstKeyPos(keyPos)
   206  	_ = c(ctx, cmd)
   207  	return cmd
   208  }
   209  
   210  func (c cmdable) XReadStreams(ctx context.Context, streams ...string) *XStreamSliceCmd {
   211  	return c.XRead(ctx, &XReadArgs{
   212  		Streams: streams,
   213  		Block:   -1,
   214  	})
   215  }
   216  
   217  func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd {
   218  	cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start)
   219  	cmd.SetFirstKeyPos(2)
   220  	_ = c(ctx, cmd)
   221  	return cmd
   222  }
   223  
   224  func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd {
   225  	cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream")
   226  	cmd.SetFirstKeyPos(2)
   227  	_ = c(ctx, cmd)
   228  	return cmd
   229  }
   230  
   231  func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd {
   232  	cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start)
   233  	cmd.SetFirstKeyPos(2)
   234  	_ = c(ctx, cmd)
   235  	return cmd
   236  }
   237  
   238  func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd {
   239  	cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group)
   240  	cmd.SetFirstKeyPos(2)
   241  	_ = c(ctx, cmd)
   242  	return cmd
   243  }
   244  
   245  func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
   246  	cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
   247  	cmd.SetFirstKeyPos(2)
   248  	_ = c(ctx, cmd)
   249  	return cmd
   250  }
   251  
   252  func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
   253  	cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
   254  	cmd.SetFirstKeyPos(2)
   255  	_ = c(ctx, cmd)
   256  	return cmd
   257  }
   258  
   259  type XReadGroupArgs struct {
   260  	Group    string
   261  	Consumer string
   262  	Streams  []string // list of streams and ids, e.g. stream1 stream2 id1 id2
   263  	Count    int64
   264  	Block    time.Duration
   265  	NoAck    bool
   266  }
   267  
   268  func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd {
   269  	args := make([]interface{}, 0, 10+len(a.Streams))
   270  	args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
   271  
   272  	keyPos := int8(4)
   273  	if a.Count > 0 {
   274  		args = append(args, "count", a.Count)
   275  		keyPos += 2
   276  	}
   277  	if a.Block >= 0 {
   278  		args = append(args, "block", int64(a.Block/time.Millisecond))
   279  		keyPos += 2
   280  	}
   281  	if a.NoAck {
   282  		args = append(args, "noack")
   283  		keyPos++
   284  	}
   285  	args = append(args, "streams")
   286  	keyPos++
   287  	for _, s := range a.Streams {
   288  		args = append(args, s)
   289  	}
   290  
   291  	cmd := NewXStreamSliceCmd(ctx, args...)
   292  	if a.Block >= 0 {
   293  		cmd.setReadTimeout(a.Block)
   294  	}
   295  	cmd.SetFirstKeyPos(keyPos)
   296  	_ = c(ctx, cmd)
   297  	return cmd
   298  }
   299  
   300  func (c cmdable) XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd {
   301  	args := []interface{}{"xack", stream, group}
   302  	for _, id := range ids {
   303  		args = append(args, id)
   304  	}
   305  	cmd := NewIntCmd(ctx, args...)
   306  	_ = c(ctx, cmd)
   307  	return cmd
   308  }
   309  
   310  func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCmd {
   311  	cmd := NewXPendingCmd(ctx, "xpending", stream, group)
   312  	_ = c(ctx, cmd)
   313  	return cmd
   314  }
   315  
   316  type XPendingExtArgs struct {
   317  	Stream   string
   318  	Group    string
   319  	Idle     time.Duration
   320  	Start    string
   321  	End      string
   322  	Count    int64
   323  	Consumer string
   324  }
   325  
   326  func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd {
   327  	args := make([]interface{}, 0, 9)
   328  	args = append(args, "xpending", a.Stream, a.Group)
   329  	if a.Idle != 0 {
   330  		args = append(args, "idle", formatMs(ctx, a.Idle))
   331  	}
   332  	args = append(args, a.Start, a.End, a.Count)
   333  	if a.Consumer != "" {
   334  		args = append(args, a.Consumer)
   335  	}
   336  	cmd := NewXPendingExtCmd(ctx, args...)
   337  	_ = c(ctx, cmd)
   338  	return cmd
   339  }
   340  
   341  type XAutoClaimArgs struct {
   342  	Stream   string
   343  	Group    string
   344  	MinIdle  time.Duration
   345  	Start    string
   346  	Count    int64
   347  	Consumer string
   348  }
   349  
   350  func (c cmdable) XAutoClaim(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimCmd {
   351  	args := xAutoClaimArgs(ctx, a)
   352  	cmd := NewXAutoClaimCmd(ctx, args...)
   353  	_ = c(ctx, cmd)
   354  	return cmd
   355  }
   356  
   357  func (c cmdable) XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd {
   358  	args := xAutoClaimArgs(ctx, a)
   359  	args = append(args, "justid")
   360  	cmd := NewXAutoClaimJustIDCmd(ctx, args...)
   361  	_ = c(ctx, cmd)
   362  	return cmd
   363  }
   364  
   365  func xAutoClaimArgs(ctx context.Context, a *XAutoClaimArgs) []interface{} {
   366  	args := make([]interface{}, 0, 8)
   367  	args = append(args, "xautoclaim", a.Stream, a.Group, a.Consumer, formatMs(ctx, a.MinIdle), a.Start)
   368  	if a.Count > 0 {
   369  		args = append(args, "count", a.Count)
   370  	}
   371  	return args
   372  }
   373  
   374  type XClaimArgs struct {
   375  	Stream   string
   376  	Group    string
   377  	Consumer string
   378  	MinIdle  time.Duration
   379  	Messages []string
   380  }
   381  
   382  func (c cmdable) XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd {
   383  	args := xClaimArgs(a)
   384  	cmd := NewXMessageSliceCmd(ctx, args...)
   385  	_ = c(ctx, cmd)
   386  	return cmd
   387  }
   388  
   389  func (c cmdable) XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd {
   390  	args := xClaimArgs(a)
   391  	args = append(args, "justid")
   392  	cmd := NewStringSliceCmd(ctx, args...)
   393  	_ = c(ctx, cmd)
   394  	return cmd
   395  }
   396  
   397  func xClaimArgs(a *XClaimArgs) []interface{} {
   398  	args := make([]interface{}, 0, 5+len(a.Messages))
   399  	args = append(args,
   400  		"xclaim",
   401  		a.Stream,
   402  		a.Group, a.Consumer,
   403  		int64(a.MinIdle/time.Millisecond))
   404  	for _, id := range a.Messages {
   405  		args = append(args, id)
   406  	}
   407  	return args
   408  }
   409  
   410  // TODO: refactor xTrim, xTrimMode and the wrappers over the functions
   411  
   412  // xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
   413  // example:
   414  //
   415  //	XTRIM key MAXLEN/MINID threshold LIMIT limit.
   416  //	XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
   417  //
   418  // The redis-server version is lower than 6.2, please set limit to 0.
   419  func (c cmdable) xTrim(
   420  	ctx context.Context, key, strategy string,
   421  	approx bool, threshold interface{}, limit int64,
   422  ) *IntCmd {
   423  	args := make([]interface{}, 0, 7)
   424  	args = append(args, "xtrim", key, strategy)
   425  	if approx {
   426  		args = append(args, "~")
   427  	}
   428  	args = append(args, threshold)
   429  	if limit > 0 {
   430  		args = append(args, "limit", limit)
   431  	}
   432  	cmd := NewIntCmd(ctx, args...)
   433  	_ = c(ctx, cmd)
   434  	return cmd
   435  }
   436  
   437  // XTrimMaxLen No `~` rules are used, `limit` cannot be used.
   438  // cmd: XTRIM key MAXLEN maxLen
   439  func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
   440  	return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
   441  }
   442  
   443  func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
   444  	return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
   445  }
   446  
   447  func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
   448  	return c.xTrim(ctx, key, "minid", false, minID, 0)
   449  }
   450  
   451  func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
   452  	return c.xTrim(ctx, key, "minid", true, minID, limit)
   453  }
   454  
   455  func (c cmdable) xTrimMode(
   456  	ctx context.Context, key, strategy string,
   457  	approx bool, threshold interface{}, limit int64,
   458  	mode string,
   459  ) *IntCmd {
   460  	args := make([]interface{}, 0, 7)
   461  	args = append(args, "xtrim", key, strategy)
   462  	if approx {
   463  		args = append(args, "~")
   464  	}
   465  	args = append(args, threshold)
   466  	if limit > 0 {
   467  		args = append(args, "limit", limit)
   468  	}
   469  	args = append(args, mode)
   470  	cmd := NewIntCmd(ctx, args...)
   471  	_ = c(ctx, cmd)
   472  	return cmd
   473  }
   474  
   475  func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
   476  	return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
   477  }
   478  
   479  func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
   480  	return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
   481  }
   482  
   483  func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
   484  	return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
   485  }
   486  
   487  func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
   488  	return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
   489  }
   490  
   491  func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
   492  	cmd := NewXInfoConsumersCmd(ctx, key, group)
   493  	_ = c(ctx, cmd)
   494  	return cmd
   495  }
   496  
   497  func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd {
   498  	cmd := NewXInfoGroupsCmd(ctx, key)
   499  	_ = c(ctx, cmd)
   500  	return cmd
   501  }
   502  
   503  func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd {
   504  	cmd := NewXInfoStreamCmd(ctx, key)
   505  	_ = c(ctx, cmd)
   506  	return cmd
   507  }
   508  
   509  // XInfoStreamFull XINFO STREAM FULL [COUNT count]
   510  // redis-server >= 6.0.
   511  func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd {
   512  	args := make([]interface{}, 0, 6)
   513  	args = append(args, "xinfo", "stream", key, "full")
   514  	if count > 0 {
   515  		args = append(args, "count", count)
   516  	}
   517  	cmd := NewXInfoStreamFullCmd(ctx, args...)
   518  	_ = c(ctx, cmd)
   519  	return cmd
   520  }
   521  

View as plain text