1 package redis
2
3 import (
4 "context"
5 "strconv"
6
7 "github.com/redis/go-redis/v9/internal/proto"
8 )
9
10 type TimeseriesCmdable interface {
11 TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd
12 TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd
13 TSCreate(ctx context.Context, key string) *StatusCmd
14 TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd
15 TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd
16 TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd
17 TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd
18 TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd
19 TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd
20 TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd
21 TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd
22 TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd
23 TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd
24 TSGet(ctx context.Context, key string) *TSTimestampValueCmd
25 TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd
26 TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd
27 TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd
28 TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd
29 TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd
30 TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd
31 TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd
32 TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd
33 TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd
34 TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd
35 TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd
36 TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd
37 TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd
38 TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd
39 TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd
40 }
41
42 type TSOptions struct {
43 Retention int
44 ChunkSize int
45 Encoding string
46 DuplicatePolicy string
47 Labels map[string]string
48 IgnoreMaxTimeDiff int64
49 IgnoreMaxValDiff float64
50 }
51 type TSIncrDecrOptions struct {
52 Timestamp int64
53 Retention int
54 ChunkSize int
55 Uncompressed bool
56 DuplicatePolicy string
57 Labels map[string]string
58 IgnoreMaxTimeDiff int64
59 IgnoreMaxValDiff float64
60 }
61
62 type TSAlterOptions struct {
63 Retention int
64 ChunkSize int
65 DuplicatePolicy string
66 Labels map[string]string
67 IgnoreMaxTimeDiff int64
68 IgnoreMaxValDiff float64
69 }
70
71 type TSCreateRuleOptions struct {
72 alignTimestamp int64
73 }
74
75 type TSGetOptions struct {
76 Latest bool
77 }
78
79 type TSInfoOptions struct {
80 Debug bool
81 }
82 type Aggregator int
83
84 const (
85 Invalid = Aggregator(iota)
86 Avg
87 Sum
88 Min
89 Max
90 Range
91 Count
92 First
93 Last
94 StdP
95 StdS
96 VarP
97 VarS
98 Twa
99 )
100
101 func (a Aggregator) String() string {
102 switch a {
103 case Invalid:
104 return ""
105 case Avg:
106 return "AVG"
107 case Sum:
108 return "SUM"
109 case Min:
110 return "MIN"
111 case Max:
112 return "MAX"
113 case Range:
114 return "RANGE"
115 case Count:
116 return "COUNT"
117 case First:
118 return "FIRST"
119 case Last:
120 return "LAST"
121 case StdP:
122 return "STD.P"
123 case StdS:
124 return "STD.S"
125 case VarP:
126 return "VAR.P"
127 case VarS:
128 return "VAR.S"
129 case Twa:
130 return "TWA"
131 default:
132 return ""
133 }
134 }
135
136 type TSRangeOptions struct {
137 Latest bool
138 FilterByTS []int
139 FilterByValue []int
140 Count int
141 Align interface{}
142 Aggregator Aggregator
143 BucketDuration int
144 BucketTimestamp interface{}
145 Empty bool
146 }
147
148 type TSRevRangeOptions struct {
149 Latest bool
150 FilterByTS []int
151 FilterByValue []int
152 Count int
153 Align interface{}
154 Aggregator Aggregator
155 BucketDuration int
156 BucketTimestamp interface{}
157 Empty bool
158 }
159
160 type TSMRangeOptions struct {
161 Latest bool
162 FilterByTS []int
163 FilterByValue []int
164 WithLabels bool
165 SelectedLabels []interface{}
166 Count int
167 Align interface{}
168 Aggregator Aggregator
169 BucketDuration int
170 BucketTimestamp interface{}
171 Empty bool
172 GroupByLabel interface{}
173 Reducer interface{}
174 }
175
176 type TSMRevRangeOptions struct {
177 Latest bool
178 FilterByTS []int
179 FilterByValue []int
180 WithLabels bool
181 SelectedLabels []interface{}
182 Count int
183 Align interface{}
184 Aggregator Aggregator
185 BucketDuration int
186 BucketTimestamp interface{}
187 Empty bool
188 GroupByLabel interface{}
189 Reducer interface{}
190 }
191
192 type TSMGetOptions struct {
193 Latest bool
194 WithLabels bool
195 SelectedLabels []interface{}
196 }
197
198
199
200 func (c cmdable) TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd {
201 args := []interface{}{"TS.ADD", key, timestamp, value}
202 cmd := NewIntCmd(ctx, args...)
203 _ = c(ctx, cmd)
204 return cmd
205 }
206
207
208
209
210
211 func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interface{}, value float64, options *TSOptions) *IntCmd {
212 args := []interface{}{"TS.ADD", key, timestamp, value}
213 if options != nil {
214 if options.Retention != 0 {
215 args = append(args, "RETENTION", options.Retention)
216 }
217 if options.ChunkSize != 0 {
218 args = append(args, "CHUNK_SIZE", options.ChunkSize)
219 }
220 if options.Encoding != "" {
221 args = append(args, "ENCODING", options.Encoding)
222 }
223
224 if options.DuplicatePolicy != "" {
225 args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
226 }
227 if options.Labels != nil {
228 args = append(args, "LABELS")
229 for label, value := range options.Labels {
230 args = append(args, label, value)
231 }
232 }
233 if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
234 args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
235 }
236 }
237 cmd := NewIntCmd(ctx, args...)
238 _ = c(ctx, cmd)
239 return cmd
240 }
241
242
243
244 func (c cmdable) TSCreate(ctx context.Context, key string) *StatusCmd {
245 args := []interface{}{"TS.CREATE", key}
246 cmd := NewStatusCmd(ctx, args...)
247 _ = c(ctx, cmd)
248 return cmd
249 }
250
251
252
253
254
255 func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOptions) *StatusCmd {
256 args := []interface{}{"TS.CREATE", key}
257 if options != nil {
258 if options.Retention != 0 {
259 args = append(args, "RETENTION", options.Retention)
260 }
261 if options.ChunkSize != 0 {
262 args = append(args, "CHUNK_SIZE", options.ChunkSize)
263 }
264 if options.Encoding != "" {
265 args = append(args, "ENCODING", options.Encoding)
266 }
267
268 if options.DuplicatePolicy != "" {
269 args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
270 }
271 if options.Labels != nil {
272 args = append(args, "LABELS")
273 for label, value := range options.Labels {
274 args = append(args, label, value)
275 }
276 }
277 if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
278 args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
279 }
280 }
281 cmd := NewStatusCmd(ctx, args...)
282 _ = c(ctx, cmd)
283 return cmd
284 }
285
286
287
288
289
290 func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOptions) *StatusCmd {
291 args := []interface{}{"TS.ALTER", key}
292 if options != nil {
293 if options.Retention != 0 {
294 args = append(args, "RETENTION", options.Retention)
295 }
296 if options.ChunkSize != 0 {
297 args = append(args, "CHUNK_SIZE", options.ChunkSize)
298 }
299 if options.DuplicatePolicy != "" {
300 args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
301 }
302 if options.Labels != nil {
303 args = append(args, "LABELS")
304 for label, value := range options.Labels {
305 args = append(args, label, value)
306 }
307 }
308 if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
309 args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
310 }
311 }
312 cmd := NewStatusCmd(ctx, args...)
313 _ = c(ctx, cmd)
314 return cmd
315 }
316
317
318
319 func (c cmdable) TSCreateRule(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int) *StatusCmd {
320 args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration}
321 cmd := NewStatusCmd(ctx, args...)
322 _ = c(ctx, cmd)
323 return cmd
324 }
325
326
327
328
329
330 func (c cmdable) TSCreateRuleWithArgs(ctx context.Context, sourceKey string, destKey string, aggregator Aggregator, bucketDuration int, options *TSCreateRuleOptions) *StatusCmd {
331 args := []interface{}{"TS.CREATERULE", sourceKey, destKey, "AGGREGATION", aggregator.String(), bucketDuration}
332 if options != nil {
333 if options.alignTimestamp != 0 {
334 args = append(args, options.alignTimestamp)
335 }
336 }
337 cmd := NewStatusCmd(ctx, args...)
338 _ = c(ctx, cmd)
339 return cmd
340 }
341
342
343
344 func (c cmdable) TSIncrBy(ctx context.Context, Key string, timestamp float64) *IntCmd {
345 args := []interface{}{"TS.INCRBY", Key, timestamp}
346 cmd := NewIntCmd(ctx, args...)
347 _ = c(ctx, cmd)
348 return cmd
349 }
350
351
352
353
354
355 func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd {
356 args := []interface{}{"TS.INCRBY", key, timestamp}
357 if options != nil {
358 if options.Timestamp != 0 {
359 args = append(args, "TIMESTAMP", options.Timestamp)
360 }
361 if options.Retention != 0 {
362 args = append(args, "RETENTION", options.Retention)
363 }
364 if options.ChunkSize != 0 {
365 args = append(args, "CHUNK_SIZE", options.ChunkSize)
366 }
367 if options.Uncompressed {
368 args = append(args, "UNCOMPRESSED")
369 }
370 if options.DuplicatePolicy != "" {
371 args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
372 }
373 if options.Labels != nil {
374 args = append(args, "LABELS")
375 for label, value := range options.Labels {
376 args = append(args, label, value)
377 }
378 }
379 if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
380 args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
381 }
382 }
383 cmd := NewIntCmd(ctx, args...)
384 _ = c(ctx, cmd)
385 return cmd
386 }
387
388
389
390 func (c cmdable) TSDecrBy(ctx context.Context, Key string, timestamp float64) *IntCmd {
391 args := []interface{}{"TS.DECRBY", Key, timestamp}
392 cmd := NewIntCmd(ctx, args...)
393 _ = c(ctx, cmd)
394 return cmd
395 }
396
397
398
399
400
401 func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp float64, options *TSIncrDecrOptions) *IntCmd {
402 args := []interface{}{"TS.DECRBY", key, timestamp}
403 if options != nil {
404 if options.Timestamp != 0 {
405 args = append(args, "TIMESTAMP", options.Timestamp)
406 }
407 if options.Retention != 0 {
408 args = append(args, "RETENTION", options.Retention)
409 }
410 if options.ChunkSize != 0 {
411 args = append(args, "CHUNK_SIZE", options.ChunkSize)
412 }
413 if options.Uncompressed {
414 args = append(args, "UNCOMPRESSED")
415 }
416 if options.DuplicatePolicy != "" {
417 args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
418 }
419 if options.Labels != nil {
420 args = append(args, "LABELS")
421 for label, value := range options.Labels {
422 args = append(args, label, value)
423 }
424 }
425 if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
426 args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
427 }
428 }
429 cmd := NewIntCmd(ctx, args...)
430 _ = c(ctx, cmd)
431 return cmd
432 }
433
434
435
436 func (c cmdable) TSDel(ctx context.Context, Key string, fromTimestamp int, toTimestamp int) *IntCmd {
437 args := []interface{}{"TS.DEL", Key, fromTimestamp, toTimestamp}
438 cmd := NewIntCmd(ctx, args...)
439 _ = c(ctx, cmd)
440 return cmd
441 }
442
443
444
445 func (c cmdable) TSDeleteRule(ctx context.Context, sourceKey string, destKey string) *StatusCmd {
446 args := []interface{}{"TS.DELETERULE", sourceKey, destKey}
447 cmd := NewStatusCmd(ctx, args...)
448 _ = c(ctx, cmd)
449 return cmd
450 }
451
452
453
454
455
456 func (c cmdable) TSGetWithArgs(ctx context.Context, key string, options *TSGetOptions) *TSTimestampValueCmd {
457 args := []interface{}{"TS.GET", key}
458 if options != nil {
459 if options.Latest {
460 args = append(args, "LATEST")
461 }
462 }
463 cmd := newTSTimestampValueCmd(ctx, args...)
464 _ = c(ctx, cmd)
465 return cmd
466 }
467
468
469
470 func (c cmdable) TSGet(ctx context.Context, key string) *TSTimestampValueCmd {
471 args := []interface{}{"TS.GET", key}
472 cmd := newTSTimestampValueCmd(ctx, args...)
473 _ = c(ctx, cmd)
474 return cmd
475 }
476
477 type TSTimestampValue struct {
478 Timestamp int64
479 Value float64
480 }
481 type TSTimestampValueCmd struct {
482 baseCmd
483 val TSTimestampValue
484 }
485
486 func newTSTimestampValueCmd(ctx context.Context, args ...interface{}) *TSTimestampValueCmd {
487 return &TSTimestampValueCmd{
488 baseCmd: baseCmd{
489 ctx: ctx,
490 args: args,
491 },
492 }
493 }
494
495 func (cmd *TSTimestampValueCmd) String() string {
496 return cmdString(cmd, cmd.val)
497 }
498
499 func (cmd *TSTimestampValueCmd) SetVal(val TSTimestampValue) {
500 cmd.val = val
501 }
502
503 func (cmd *TSTimestampValueCmd) Result() (TSTimestampValue, error) {
504 return cmd.val, cmd.err
505 }
506
507 func (cmd *TSTimestampValueCmd) Val() TSTimestampValue {
508 return cmd.val
509 }
510
511 func (cmd *TSTimestampValueCmd) readReply(rd *proto.Reader) (err error) {
512 n, err := rd.ReadMapLen()
513 if err != nil {
514 return err
515 }
516 cmd.val = TSTimestampValue{}
517 for i := 0; i < n; i++ {
518 timestamp, err := rd.ReadInt()
519 if err != nil {
520 return err
521 }
522 value, err := rd.ReadString()
523 if err != nil {
524 return err
525 }
526 cmd.val.Timestamp = timestamp
527 cmd.val.Value, err = strconv.ParseFloat(value, 64)
528 if err != nil {
529 return err
530 }
531 }
532
533 return nil
534 }
535
536
537
538 func (c cmdable) TSInfo(ctx context.Context, key string) *MapStringInterfaceCmd {
539 args := []interface{}{"TS.INFO", key}
540 cmd := NewMapStringInterfaceCmd(ctx, args...)
541 _ = c(ctx, cmd)
542 return cmd
543 }
544
545
546
547
548
549 func (c cmdable) TSInfoWithArgs(ctx context.Context, key string, options *TSInfoOptions) *MapStringInterfaceCmd {
550 args := []interface{}{"TS.INFO", key}
551 if options != nil {
552 if options.Debug {
553 args = append(args, "DEBUG")
554 }
555 }
556 cmd := NewMapStringInterfaceCmd(ctx, args...)
557 _ = c(ctx, cmd)
558 return cmd
559 }
560
561
562
563
564
565 func (c cmdable) TSMAdd(ctx context.Context, ktvSlices [][]interface{}) *IntSliceCmd {
566 args := []interface{}{"TS.MADD"}
567 for _, ktv := range ktvSlices {
568 args = append(args, ktv...)
569 }
570 cmd := NewIntSliceCmd(ctx, args...)
571 _ = c(ctx, cmd)
572 return cmd
573 }
574
575
576
577 func (c cmdable) TSQueryIndex(ctx context.Context, filterExpr []string) *StringSliceCmd {
578 args := []interface{}{"TS.QUERYINDEX"}
579 for _, f := range filterExpr {
580 args = append(args, f)
581 }
582 cmd := NewStringSliceCmd(ctx, args...)
583 _ = c(ctx, cmd)
584 return cmd
585 }
586
587
588
589 func (c cmdable) TSRevRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd {
590 args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp}
591 cmd := newTSTimestampValueSliceCmd(ctx, args...)
592 _ = c(ctx, cmd)
593 return cmd
594 }
595
596
597
598
599
600
601 func (c cmdable) TSRevRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRevRangeOptions) *TSTimestampValueSliceCmd {
602 args := []interface{}{"TS.REVRANGE", key, fromTimestamp, toTimestamp}
603 if options != nil {
604 if options.Latest {
605 args = append(args, "LATEST")
606 }
607 if options.FilterByTS != nil {
608 args = append(args, "FILTER_BY_TS")
609 for _, f := range options.FilterByTS {
610 args = append(args, f)
611 }
612 }
613 if options.FilterByValue != nil {
614 args = append(args, "FILTER_BY_VALUE")
615 for _, f := range options.FilterByValue {
616 args = append(args, f)
617 }
618 }
619 if options.Count != 0 {
620 args = append(args, "COUNT", options.Count)
621 }
622 if options.Align != nil {
623 args = append(args, "ALIGN", options.Align)
624 }
625 if options.Aggregator != 0 {
626 args = append(args, "AGGREGATION", options.Aggregator.String())
627 }
628 if options.BucketDuration != 0 {
629 args = append(args, options.BucketDuration)
630 }
631 if options.BucketTimestamp != nil {
632 args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
633 }
634 if options.Empty {
635 args = append(args, "EMPTY")
636 }
637 }
638 cmd := newTSTimestampValueSliceCmd(ctx, args...)
639 _ = c(ctx, cmd)
640 return cmd
641 }
642
643
644
645 func (c cmdable) TSRange(ctx context.Context, key string, fromTimestamp int, toTimestamp int) *TSTimestampValueSliceCmd {
646 args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp}
647 cmd := newTSTimestampValueSliceCmd(ctx, args...)
648 _ = c(ctx, cmd)
649 return cmd
650 }
651
652
653
654
655
656
657 func (c cmdable) TSRangeWithArgs(ctx context.Context, key string, fromTimestamp int, toTimestamp int, options *TSRangeOptions) *TSTimestampValueSliceCmd {
658 args := []interface{}{"TS.RANGE", key, fromTimestamp, toTimestamp}
659 if options != nil {
660 if options.Latest {
661 args = append(args, "LATEST")
662 }
663 if options.FilterByTS != nil {
664 args = append(args, "FILTER_BY_TS")
665 for _, f := range options.FilterByTS {
666 args = append(args, f)
667 }
668 }
669 if options.FilterByValue != nil {
670 args = append(args, "FILTER_BY_VALUE")
671 for _, f := range options.FilterByValue {
672 args = append(args, f)
673 }
674 }
675 if options.Count != 0 {
676 args = append(args, "COUNT", options.Count)
677 }
678 if options.Align != nil {
679 args = append(args, "ALIGN", options.Align)
680 }
681 if options.Aggregator != 0 {
682 args = append(args, "AGGREGATION", options.Aggregator.String())
683 }
684 if options.BucketDuration != 0 {
685 args = append(args, options.BucketDuration)
686 }
687 if options.BucketTimestamp != nil {
688 args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
689 }
690 if options.Empty {
691 args = append(args, "EMPTY")
692 }
693 }
694 cmd := newTSTimestampValueSliceCmd(ctx, args...)
695 _ = c(ctx, cmd)
696 return cmd
697 }
698
699 type TSTimestampValueSliceCmd struct {
700 baseCmd
701 val []TSTimestampValue
702 }
703
704 func newTSTimestampValueSliceCmd(ctx context.Context, args ...interface{}) *TSTimestampValueSliceCmd {
705 return &TSTimestampValueSliceCmd{
706 baseCmd: baseCmd{
707 ctx: ctx,
708 args: args,
709 },
710 }
711 }
712
713 func (cmd *TSTimestampValueSliceCmd) String() string {
714 return cmdString(cmd, cmd.val)
715 }
716
717 func (cmd *TSTimestampValueSliceCmd) SetVal(val []TSTimestampValue) {
718 cmd.val = val
719 }
720
721 func (cmd *TSTimestampValueSliceCmd) Result() ([]TSTimestampValue, error) {
722 return cmd.val, cmd.err
723 }
724
725 func (cmd *TSTimestampValueSliceCmd) Val() []TSTimestampValue {
726 return cmd.val
727 }
728
729 func (cmd *TSTimestampValueSliceCmd) readReply(rd *proto.Reader) (err error) {
730 n, err := rd.ReadArrayLen()
731 if err != nil {
732 return err
733 }
734 cmd.val = make([]TSTimestampValue, n)
735 for i := 0; i < n; i++ {
736 _, _ = rd.ReadArrayLen()
737 timestamp, err := rd.ReadInt()
738 if err != nil {
739 return err
740 }
741 value, err := rd.ReadString()
742 if err != nil {
743 return err
744 }
745 cmd.val[i].Timestamp = timestamp
746 cmd.val[i].Value, err = strconv.ParseFloat(value, 64)
747 if err != nil {
748 return err
749 }
750 }
751
752 return nil
753 }
754
755
756
757 func (c cmdable) TSMRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd {
758 args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp, "FILTER"}
759 for _, f := range filterExpr {
760 args = append(args, f)
761 }
762 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
763 _ = c(ctx, cmd)
764 return cmd
765 }
766
767
768
769
770
771
772
773 func (c cmdable) TSMRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRangeOptions) *MapStringSliceInterfaceCmd {
774 args := []interface{}{"TS.MRANGE", fromTimestamp, toTimestamp}
775 if options != nil {
776 if options.Latest {
777 args = append(args, "LATEST")
778 }
779 if options.FilterByTS != nil {
780 args = append(args, "FILTER_BY_TS")
781 for _, f := range options.FilterByTS {
782 args = append(args, f)
783 }
784 }
785 if options.FilterByValue != nil {
786 args = append(args, "FILTER_BY_VALUE")
787 for _, f := range options.FilterByValue {
788 args = append(args, f)
789 }
790 }
791 if options.WithLabels {
792 args = append(args, "WITHLABELS")
793 }
794 if options.SelectedLabels != nil {
795 args = append(args, "SELECTED_LABELS")
796 args = append(args, options.SelectedLabels...)
797 }
798 if options.Count != 0 {
799 args = append(args, "COUNT", options.Count)
800 }
801 if options.Align != nil {
802 args = append(args, "ALIGN", options.Align)
803 }
804 if options.Aggregator != 0 {
805 args = append(args, "AGGREGATION", options.Aggregator.String())
806 }
807 if options.BucketDuration != 0 {
808 args = append(args, options.BucketDuration)
809 }
810 if options.BucketTimestamp != nil {
811 args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
812 }
813 if options.Empty {
814 args = append(args, "EMPTY")
815 }
816 }
817 args = append(args, "FILTER")
818 for _, f := range filterExpr {
819 args = append(args, f)
820 }
821 if options != nil {
822 if options.GroupByLabel != nil {
823 args = append(args, "GROUPBY", options.GroupByLabel)
824 }
825 if options.Reducer != nil {
826 args = append(args, "REDUCE", options.Reducer)
827 }
828 }
829 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
830 _ = c(ctx, cmd)
831 return cmd
832 }
833
834
835
836 func (c cmdable) TSMRevRange(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string) *MapStringSliceInterfaceCmd {
837 args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp, "FILTER"}
838 for _, f := range filterExpr {
839 args = append(args, f)
840 }
841 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
842 _ = c(ctx, cmd)
843 return cmd
844 }
845
846
847
848
849
850
851
852 func (c cmdable) TSMRevRangeWithArgs(ctx context.Context, fromTimestamp int, toTimestamp int, filterExpr []string, options *TSMRevRangeOptions) *MapStringSliceInterfaceCmd {
853 args := []interface{}{"TS.MREVRANGE", fromTimestamp, toTimestamp}
854 if options != nil {
855 if options.Latest {
856 args = append(args, "LATEST")
857 }
858 if options.FilterByTS != nil {
859 args = append(args, "FILTER_BY_TS")
860 for _, f := range options.FilterByTS {
861 args = append(args, f)
862 }
863 }
864 if options.FilterByValue != nil {
865 args = append(args, "FILTER_BY_VALUE")
866 for _, f := range options.FilterByValue {
867 args = append(args, f)
868 }
869 }
870 if options.WithLabels {
871 args = append(args, "WITHLABELS")
872 }
873 if options.SelectedLabels != nil {
874 args = append(args, "SELECTED_LABELS")
875 args = append(args, options.SelectedLabels...)
876 }
877 if options.Count != 0 {
878 args = append(args, "COUNT", options.Count)
879 }
880 if options.Align != nil {
881 args = append(args, "ALIGN", options.Align)
882 }
883 if options.Aggregator != 0 {
884 args = append(args, "AGGREGATION", options.Aggregator.String())
885 }
886 if options.BucketDuration != 0 {
887 args = append(args, options.BucketDuration)
888 }
889 if options.BucketTimestamp != nil {
890 args = append(args, "BUCKETTIMESTAMP", options.BucketTimestamp)
891 }
892 if options.Empty {
893 args = append(args, "EMPTY")
894 }
895 }
896 args = append(args, "FILTER")
897 for _, f := range filterExpr {
898 args = append(args, f)
899 }
900 if options != nil {
901 if options.GroupByLabel != nil {
902 args = append(args, "GROUPBY", options.GroupByLabel)
903 }
904 if options.Reducer != nil {
905 args = append(args, "REDUCE", options.Reducer)
906 }
907 }
908 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
909 _ = c(ctx, cmd)
910 return cmd
911 }
912
913
914
915 func (c cmdable) TSMGet(ctx context.Context, filters []string) *MapStringSliceInterfaceCmd {
916 args := []interface{}{"TS.MGET", "FILTER"}
917 for _, f := range filters {
918 args = append(args, f)
919 }
920 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
921 _ = c(ctx, cmd)
922 return cmd
923 }
924
925
926
927
928
929 func (c cmdable) TSMGetWithArgs(ctx context.Context, filters []string, options *TSMGetOptions) *MapStringSliceInterfaceCmd {
930 args := []interface{}{"TS.MGET"}
931 if options != nil {
932 if options.Latest {
933 args = append(args, "LATEST")
934 }
935 if options.WithLabels {
936 args = append(args, "WITHLABELS")
937 }
938 if options.SelectedLabels != nil {
939 args = append(args, "SELECTED_LABELS")
940 args = append(args, options.SelectedLabels...)
941 }
942 }
943 args = append(args, "FILTER")
944 for _, f := range filters {
945 args = append(args, f)
946 }
947 cmd := NewMapStringSliceInterfaceCmd(ctx, args...)
948 _ = c(ctx, cmd)
949 return cmd
950 }
951
View as plain text