1 package proto
2
3 import (
4 "bufio"
5 "errors"
6 "fmt"
7 "io"
8 "math"
9 "math/big"
10 "strconv"
11
12 "github.com/redis/go-redis/v9/internal/util"
13 )
14
15
16 const DefaultBufferSize = 32 * 1024
17
18
19 const (
20 RespStatus = '+'
21 RespError = '-'
22 RespString = '$'
23 RespInt = ':'
24 RespNil = '_'
25 RespFloat = ','
26 RespBool = '#'
27 RespBlobError = '!'
28 RespVerbatim = '='
29 RespBigInt = '('
30 RespArray = '*'
31 RespMap = '%'
32 RespSet = '~'
33 RespAttr = '|'
34 RespPush = '>'
35 )
36
37
38
39
40
41
42
43
44 const Nil = RedisError("redis: nil")
45
46 type RedisError string
47
48 func (e RedisError) Error() string { return string(e) }
49
50 func (RedisError) RedisError() {}
51
52 func ParseErrorReply(line []byte) error {
53 return RedisError(line[1:])
54 }
55
56
57
58 type Reader struct {
59 rd *bufio.Reader
60 }
61
62 func NewReader(rd io.Reader) *Reader {
63 return &Reader{
64 rd: bufio.NewReaderSize(rd, DefaultBufferSize),
65 }
66 }
67
68 func NewReaderSize(rd io.Reader, size int) *Reader {
69 return &Reader{
70 rd: bufio.NewReaderSize(rd, size),
71 }
72 }
73
74 func (r *Reader) Buffered() int {
75 return r.rd.Buffered()
76 }
77
78 func (r *Reader) Peek(n int) ([]byte, error) {
79 return r.rd.Peek(n)
80 }
81
82 func (r *Reader) Reset(rd io.Reader) {
83 r.rd.Reset(rd)
84 }
85
86
87
88 func (r *Reader) PeekReplyType() (byte, error) {
89 b, err := r.rd.Peek(1)
90 if err != nil {
91 return 0, err
92 }
93 if b[0] == RespAttr {
94 if err = r.DiscardNext(); err != nil {
95 return 0, err
96 }
97 return r.PeekReplyType()
98 }
99 return b[0], nil
100 }
101
102
103
104 func (r *Reader) ReadLine() ([]byte, error) {
105 line, err := r.readLine()
106 if err != nil {
107 return nil, err
108 }
109 switch line[0] {
110 case RespError:
111 return nil, ParseErrorReply(line)
112 case RespNil:
113 return nil, Nil
114 case RespBlobError:
115 var blobErr string
116 blobErr, err = r.readStringReply(line)
117 if err == nil {
118 err = RedisError(blobErr)
119 }
120 return nil, err
121 case RespAttr:
122 if err = r.Discard(line); err != nil {
123 return nil, err
124 }
125 return r.ReadLine()
126 }
127
128
129 if IsNilReply(line) {
130 return nil, Nil
131 }
132
133 return line, nil
134 }
135
136
137
138
139 func (r *Reader) readLine() ([]byte, error) {
140 b, err := r.rd.ReadSlice('\n')
141 if err != nil {
142 if err != bufio.ErrBufferFull {
143 return nil, err
144 }
145
146 full := make([]byte, len(b))
147 copy(full, b)
148
149 b, err = r.rd.ReadBytes('\n')
150 if err != nil {
151 return nil, err
152 }
153
154 full = append(full, b...)
155 b = full
156 }
157 if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
158 return nil, fmt.Errorf("redis: invalid reply: %q", b)
159 }
160 return b[:len(b)-2], nil
161 }
162
163 func (r *Reader) ReadReply() (interface{}, error) {
164 line, err := r.ReadLine()
165 if err != nil {
166 return nil, err
167 }
168
169 switch line[0] {
170 case RespStatus:
171 return string(line[1:]), nil
172 case RespInt:
173 return util.ParseInt(line[1:], 10, 64)
174 case RespFloat:
175 return r.readFloat(line)
176 case RespBool:
177 return r.readBool(line)
178 case RespBigInt:
179 return r.readBigInt(line)
180
181 case RespString:
182 return r.readStringReply(line)
183 case RespVerbatim:
184 return r.readVerb(line)
185
186 case RespArray, RespSet, RespPush:
187 return r.readSlice(line)
188 case RespMap:
189 return r.readMap(line)
190 }
191 return nil, fmt.Errorf("redis: can't parse %.100q", line)
192 }
193
194 func (r *Reader) readFloat(line []byte) (float64, error) {
195 v := string(line[1:])
196 switch string(line[1:]) {
197 case "inf":
198 return math.Inf(1), nil
199 case "-inf":
200 return math.Inf(-1), nil
201 case "nan", "-nan":
202 return math.NaN(), nil
203 }
204 return strconv.ParseFloat(v, 64)
205 }
206
207 func (r *Reader) readBool(line []byte) (bool, error) {
208 switch string(line[1:]) {
209 case "t":
210 return true, nil
211 case "f":
212 return false, nil
213 }
214 return false, fmt.Errorf("redis: can't parse bool reply: %q", line)
215 }
216
217 func (r *Reader) readBigInt(line []byte) (*big.Int, error) {
218 i := new(big.Int)
219 if i, ok := i.SetString(string(line[1:]), 10); ok {
220 return i, nil
221 }
222 return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line)
223 }
224
225 func (r *Reader) readStringReply(line []byte) (string, error) {
226 n, err := replyLen(line)
227 if err != nil {
228 return "", err
229 }
230
231 b := make([]byte, n+2)
232 _, err = io.ReadFull(r.rd, b)
233 if err != nil {
234 return "", err
235 }
236
237 return util.BytesToString(b[:n]), nil
238 }
239
240 func (r *Reader) readVerb(line []byte) (string, error) {
241 s, err := r.readStringReply(line)
242 if err != nil {
243 return "", err
244 }
245 if len(s) < 4 || s[3] != ':' {
246 return "", fmt.Errorf("redis: can't parse verbatim string reply: %q", line)
247 }
248 return s[4:], nil
249 }
250
251 func (r *Reader) readSlice(line []byte) ([]interface{}, error) {
252 n, err := replyLen(line)
253 if err != nil {
254 return nil, err
255 }
256
257 val := make([]interface{}, n)
258 for i := 0; i < len(val); i++ {
259 v, err := r.ReadReply()
260 if err != nil {
261 if err == Nil {
262 val[i] = nil
263 continue
264 }
265 if err, ok := err.(RedisError); ok {
266 val[i] = err
267 continue
268 }
269 return nil, err
270 }
271 val[i] = v
272 }
273 return val, nil
274 }
275
276 func (r *Reader) readMap(line []byte) (map[interface{}]interface{}, error) {
277 n, err := replyLen(line)
278 if err != nil {
279 return nil, err
280 }
281 m := make(map[interface{}]interface{}, n)
282 for i := 0; i < n; i++ {
283 k, err := r.ReadReply()
284 if err != nil {
285 return nil, err
286 }
287 v, err := r.ReadReply()
288 if err != nil {
289 if err == Nil {
290 m[k] = nil
291 continue
292 }
293 if err, ok := err.(RedisError); ok {
294 m[k] = err
295 continue
296 }
297 return nil, err
298 }
299 m[k] = v
300 }
301 return m, nil
302 }
303
304
305
306 func (r *Reader) ReadInt() (int64, error) {
307 line, err := r.ReadLine()
308 if err != nil {
309 return 0, err
310 }
311 switch line[0] {
312 case RespInt, RespStatus:
313 return util.ParseInt(line[1:], 10, 64)
314 case RespString:
315 s, err := r.readStringReply(line)
316 if err != nil {
317 return 0, err
318 }
319 return util.ParseInt([]byte(s), 10, 64)
320 case RespBigInt:
321 b, err := r.readBigInt(line)
322 if err != nil {
323 return 0, err
324 }
325 if !b.IsInt64() {
326 return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
327 }
328 return b.Int64(), nil
329 }
330 return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
331 }
332
333 func (r *Reader) ReadUint() (uint64, error) {
334 line, err := r.ReadLine()
335 if err != nil {
336 return 0, err
337 }
338 switch line[0] {
339 case RespInt, RespStatus:
340 return util.ParseUint(line[1:], 10, 64)
341 case RespString:
342 s, err := r.readStringReply(line)
343 if err != nil {
344 return 0, err
345 }
346 return util.ParseUint([]byte(s), 10, 64)
347 case RespBigInt:
348 b, err := r.readBigInt(line)
349 if err != nil {
350 return 0, err
351 }
352 if !b.IsUint64() {
353 return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
354 }
355 return b.Uint64(), nil
356 }
357 return 0, fmt.Errorf("redis: can't parse uint reply: %.100q", line)
358 }
359
360 func (r *Reader) ReadFloat() (float64, error) {
361 line, err := r.ReadLine()
362 if err != nil {
363 return 0, err
364 }
365 switch line[0] {
366 case RespFloat:
367 return r.readFloat(line)
368 case RespStatus:
369 return strconv.ParseFloat(string(line[1:]), 64)
370 case RespString:
371 s, err := r.readStringReply(line)
372 if err != nil {
373 return 0, err
374 }
375 return strconv.ParseFloat(s, 64)
376 }
377 return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line)
378 }
379
380 func (r *Reader) ReadString() (string, error) {
381 line, err := r.ReadLine()
382 if err != nil {
383 return "", err
384 }
385
386 switch line[0] {
387 case RespStatus, RespInt, RespFloat:
388 return string(line[1:]), nil
389 case RespString:
390 return r.readStringReply(line)
391 case RespBool:
392 b, err := r.readBool(line)
393 return strconv.FormatBool(b), err
394 case RespVerbatim:
395 return r.readVerb(line)
396 case RespBigInt:
397 b, err := r.readBigInt(line)
398 if err != nil {
399 return "", err
400 }
401 return b.String(), nil
402 }
403 return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
404 }
405
406 func (r *Reader) ReadBool() (bool, error) {
407 s, err := r.ReadString()
408 if err != nil {
409 return false, err
410 }
411 return s == "OK" || s == "1" || s == "true", nil
412 }
413
414 func (r *Reader) ReadSlice() ([]interface{}, error) {
415 line, err := r.ReadLine()
416 if err != nil {
417 return nil, err
418 }
419 return r.readSlice(line)
420 }
421
422
423 func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
424 n, err := r.ReadArrayLen()
425 if err != nil {
426 return err
427 }
428 if n != fixedLen {
429 return fmt.Errorf("redis: got %d elements in the array, wanted %d", n, fixedLen)
430 }
431 return nil
432 }
433
434
435 func (r *Reader) ReadArrayLen() (int, error) {
436 line, err := r.ReadLine()
437 if err != nil {
438 return 0, err
439 }
440 switch line[0] {
441 case RespArray, RespSet, RespPush:
442 return replyLen(line)
443 default:
444 return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line)
445 }
446 }
447
448
449 func (r *Reader) ReadFixedMapLen(fixedLen int) error {
450 n, err := r.ReadMapLen()
451 if err != nil {
452 return err
453 }
454 if n != fixedLen {
455 return fmt.Errorf("redis: got %d elements in the map, wanted %d", n, fixedLen)
456 }
457 return nil
458 }
459
460
461
462
463
464 func (r *Reader) ReadMapLen() (int, error) {
465 line, err := r.ReadLine()
466 if err != nil {
467 return 0, err
468 }
469 switch line[0] {
470 case RespMap:
471 return replyLen(line)
472 case RespArray, RespSet, RespPush:
473
474 n, err := replyLen(line)
475 if err != nil {
476 return 0, err
477 }
478 if n%2 != 0 {
479 return 0, fmt.Errorf("redis: the length of the array must be a multiple of 2, got: %d", n)
480 }
481 return n / 2, nil
482 default:
483 return 0, fmt.Errorf("redis: can't parse map reply: %.100q", line)
484 }
485 }
486
487
488 func (r *Reader) DiscardNext() error {
489 line, err := r.readLine()
490 if err != nil {
491 return err
492 }
493 return r.Discard(line)
494 }
495
496
497 func (r *Reader) Discard(line []byte) (err error) {
498 if len(line) == 0 {
499 return errors.New("redis: invalid line")
500 }
501 switch line[0] {
502 case RespStatus, RespError, RespInt, RespNil, RespFloat, RespBool, RespBigInt:
503 return nil
504 }
505
506 n, err := replyLen(line)
507 if err != nil && err != Nil {
508 return err
509 }
510
511 switch line[0] {
512 case RespBlobError, RespString, RespVerbatim:
513
514 _, err = r.rd.Discard(n + 2)
515 return err
516 case RespArray, RespSet, RespPush:
517 for i := 0; i < n; i++ {
518 if err = r.DiscardNext(); err != nil {
519 return err
520 }
521 }
522 return nil
523 case RespMap, RespAttr:
524
525 for i := 0; i < n*2; i++ {
526 if err = r.DiscardNext(); err != nil {
527 return err
528 }
529 }
530 return nil
531 }
532
533 return fmt.Errorf("redis: can't parse %.100q", line)
534 }
535
536 func replyLen(line []byte) (n int, err error) {
537 n, err = util.Atoi(line[1:])
538 if err != nil {
539 return 0, err
540 }
541
542 if n < -1 {
543 return 0, fmt.Errorf("redis: invalid reply: %q", line)
544 }
545
546 switch line[0] {
547 case RespString, RespVerbatim, RespBlobError,
548 RespArray, RespSet, RespPush, RespMap, RespAttr:
549 if n == -1 {
550 return 0, Nil
551 }
552 }
553 return n, nil
554 }
555
556
557 func IsNilReply(line []byte) bool {
558 return len(line) == 3 &&
559 (line[0] == RespString || line[0] == RespArray) &&
560 line[1] == '-' && line[2] == '1'
561 }
562
View as plain text