...

Source file src/github.com/redis/go-redis/v9/internal/proto/reader.go

Documentation: github.com/redis/go-redis/v9/internal/proto

     1  package proto
     2  
     3  import (
     4  	"bufio"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"math"
     9  	"math/big"
    10  	"strconv"
    11  
    12  	"github.com/redis/go-redis/v9/internal/util"
    13  )
    14  
    15  // DefaultBufferSize is the default size for read/write buffers (32 KiB).
    16  const DefaultBufferSize = 32 * 1024
    17  
    18  // redis resp protocol data type.
    19  const (
    20  	RespStatus    = '+' // +<string>\r\n
    21  	RespError     = '-' // -<string>\r\n
    22  	RespString    = '$' // $<length>\r\n<bytes>\r\n
    23  	RespInt       = ':' // :<number>\r\n
    24  	RespNil       = '_' // _\r\n
    25  	RespFloat     = ',' // ,<floating-point-number>\r\n (golang float)
    26  	RespBool      = '#' // true: #t\r\n false: #f\r\n
    27  	RespBlobError = '!' // !<length>\r\n<bytes>\r\n
    28  	RespVerbatim  = '=' // =<length>\r\nFORMAT:<bytes>\r\n
    29  	RespBigInt    = '(' // (<big number>\r\n
    30  	RespArray     = '*' // *<len>\r\n... (same as resp2)
    31  	RespMap       = '%' // %<len>\r\n(key)\r\n(value)\r\n... (golang map)
    32  	RespSet       = '~' // ~<len>\r\n... (same as Array)
    33  	RespAttr      = '|' // |<len>\r\n(key)\r\n(value)\r\n... + command reply
    34  	RespPush      = '>' // ><len>\r\n... (same as Array)
    35  )
    36  
    37  // Not used temporarily.
    38  // Redis has not used these two data types for the time being, and will implement them later.
    39  // Streamed           = "EOF:"
    40  // StreamedAggregated = '?'
    41  
    42  //------------------------------------------------------------------------------
    43  
    44  const Nil = RedisError("redis: nil") // nolint:errname
    45  
    46  type RedisError string
    47  
    48  func (e RedisError) Error() string { return string(e) }
    49  
    50  func (RedisError) RedisError() {}
    51  
    52  func ParseErrorReply(line []byte) error {
    53  	return RedisError(line[1:])
    54  }
    55  
    56  //------------------------------------------------------------------------------
    57  
    58  type Reader struct {
    59  	rd *bufio.Reader
    60  }
    61  
    62  func NewReader(rd io.Reader) *Reader {
    63  	return &Reader{
    64  		rd: bufio.NewReaderSize(rd, DefaultBufferSize),
    65  	}
    66  }
    67  
    68  func NewReaderSize(rd io.Reader, size int) *Reader {
    69  	return &Reader{
    70  		rd: bufio.NewReaderSize(rd, size),
    71  	}
    72  }
    73  
    74  func (r *Reader) Buffered() int {
    75  	return r.rd.Buffered()
    76  }
    77  
    78  func (r *Reader) Peek(n int) ([]byte, error) {
    79  	return r.rd.Peek(n)
    80  }
    81  
    82  func (r *Reader) Reset(rd io.Reader) {
    83  	r.rd.Reset(rd)
    84  }
    85  
    86  // PeekReplyType returns the data type of the next response without advancing the Reader,
    87  // and discard the attribute type.
    88  func (r *Reader) PeekReplyType() (byte, error) {
    89  	b, err := r.rd.Peek(1)
    90  	if err != nil {
    91  		return 0, err
    92  	}
    93  	if b[0] == RespAttr {
    94  		if err = r.DiscardNext(); err != nil {
    95  			return 0, err
    96  		}
    97  		return r.PeekReplyType()
    98  	}
    99  	return b[0], nil
   100  }
   101  
   102  // ReadLine Return a valid reply, it will check the protocol or redis error,
   103  // and discard the attribute type.
   104  func (r *Reader) ReadLine() ([]byte, error) {
   105  	line, err := r.readLine()
   106  	if err != nil {
   107  		return nil, err
   108  	}
   109  	switch line[0] {
   110  	case RespError:
   111  		return nil, ParseErrorReply(line)
   112  	case RespNil:
   113  		return nil, Nil
   114  	case RespBlobError:
   115  		var blobErr string
   116  		blobErr, err = r.readStringReply(line)
   117  		if err == nil {
   118  			err = RedisError(blobErr)
   119  		}
   120  		return nil, err
   121  	case RespAttr:
   122  		if err = r.Discard(line); err != nil {
   123  			return nil, err
   124  		}
   125  		return r.ReadLine()
   126  	}
   127  
   128  	// Compatible with RESP2
   129  	if IsNilReply(line) {
   130  		return nil, Nil
   131  	}
   132  
   133  	return line, nil
   134  }
   135  
   136  // readLine returns an error if:
   137  //   - there is a pending read error;
   138  //   - or line does not end with \r\n.
   139  func (r *Reader) readLine() ([]byte, error) {
   140  	b, err := r.rd.ReadSlice('\n')
   141  	if err != nil {
   142  		if err != bufio.ErrBufferFull {
   143  			return nil, err
   144  		}
   145  
   146  		full := make([]byte, len(b))
   147  		copy(full, b)
   148  
   149  		b, err = r.rd.ReadBytes('\n')
   150  		if err != nil {
   151  			return nil, err
   152  		}
   153  
   154  		full = append(full, b...) //nolint:makezero
   155  		b = full
   156  	}
   157  	if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
   158  		return nil, fmt.Errorf("redis: invalid reply: %q", b)
   159  	}
   160  	return b[:len(b)-2], nil
   161  }
   162  
   163  func (r *Reader) ReadReply() (interface{}, error) {
   164  	line, err := r.ReadLine()
   165  	if err != nil {
   166  		return nil, err
   167  	}
   168  
   169  	switch line[0] {
   170  	case RespStatus:
   171  		return string(line[1:]), nil
   172  	case RespInt:
   173  		return util.ParseInt(line[1:], 10, 64)
   174  	case RespFloat:
   175  		return r.readFloat(line)
   176  	case RespBool:
   177  		return r.readBool(line)
   178  	case RespBigInt:
   179  		return r.readBigInt(line)
   180  
   181  	case RespString:
   182  		return r.readStringReply(line)
   183  	case RespVerbatim:
   184  		return r.readVerb(line)
   185  
   186  	case RespArray, RespSet, RespPush:
   187  		return r.readSlice(line)
   188  	case RespMap:
   189  		return r.readMap(line)
   190  	}
   191  	return nil, fmt.Errorf("redis: can't parse %.100q", line)
   192  }
   193  
   194  func (r *Reader) readFloat(line []byte) (float64, error) {
   195  	v := string(line[1:])
   196  	switch string(line[1:]) {
   197  	case "inf":
   198  		return math.Inf(1), nil
   199  	case "-inf":
   200  		return math.Inf(-1), nil
   201  	case "nan", "-nan":
   202  		return math.NaN(), nil
   203  	}
   204  	return strconv.ParseFloat(v, 64)
   205  }
   206  
   207  func (r *Reader) readBool(line []byte) (bool, error) {
   208  	switch string(line[1:]) {
   209  	case "t":
   210  		return true, nil
   211  	case "f":
   212  		return false, nil
   213  	}
   214  	return false, fmt.Errorf("redis: can't parse bool reply: %q", line)
   215  }
   216  
   217  func (r *Reader) readBigInt(line []byte) (*big.Int, error) {
   218  	i := new(big.Int)
   219  	if i, ok := i.SetString(string(line[1:]), 10); ok {
   220  		return i, nil
   221  	}
   222  	return nil, fmt.Errorf("redis: can't parse bigInt reply: %q", line)
   223  }
   224  
   225  func (r *Reader) readStringReply(line []byte) (string, error) {
   226  	n, err := replyLen(line)
   227  	if err != nil {
   228  		return "", err
   229  	}
   230  
   231  	b := make([]byte, n+2)
   232  	_, err = io.ReadFull(r.rd, b)
   233  	if err != nil {
   234  		return "", err
   235  	}
   236  
   237  	return util.BytesToString(b[:n]), nil
   238  }
   239  
   240  func (r *Reader) readVerb(line []byte) (string, error) {
   241  	s, err := r.readStringReply(line)
   242  	if err != nil {
   243  		return "", err
   244  	}
   245  	if len(s) < 4 || s[3] != ':' {
   246  		return "", fmt.Errorf("redis: can't parse verbatim string reply: %q", line)
   247  	}
   248  	return s[4:], nil
   249  }
   250  
   251  func (r *Reader) readSlice(line []byte) ([]interface{}, error) {
   252  	n, err := replyLen(line)
   253  	if err != nil {
   254  		return nil, err
   255  	}
   256  
   257  	val := make([]interface{}, n)
   258  	for i := 0; i < len(val); i++ {
   259  		v, err := r.ReadReply()
   260  		if err != nil {
   261  			if err == Nil {
   262  				val[i] = nil
   263  				continue
   264  			}
   265  			if err, ok := err.(RedisError); ok {
   266  				val[i] = err
   267  				continue
   268  			}
   269  			return nil, err
   270  		}
   271  		val[i] = v
   272  	}
   273  	return val, nil
   274  }
   275  
   276  func (r *Reader) readMap(line []byte) (map[interface{}]interface{}, error) {
   277  	n, err := replyLen(line)
   278  	if err != nil {
   279  		return nil, err
   280  	}
   281  	m := make(map[interface{}]interface{}, n)
   282  	for i := 0; i < n; i++ {
   283  		k, err := r.ReadReply()
   284  		if err != nil {
   285  			return nil, err
   286  		}
   287  		v, err := r.ReadReply()
   288  		if err != nil {
   289  			if err == Nil {
   290  				m[k] = nil
   291  				continue
   292  			}
   293  			if err, ok := err.(RedisError); ok {
   294  				m[k] = err
   295  				continue
   296  			}
   297  			return nil, err
   298  		}
   299  		m[k] = v
   300  	}
   301  	return m, nil
   302  }
   303  
   304  // -------------------------------
   305  
   306  func (r *Reader) ReadInt() (int64, error) {
   307  	line, err := r.ReadLine()
   308  	if err != nil {
   309  		return 0, err
   310  	}
   311  	switch line[0] {
   312  	case RespInt, RespStatus:
   313  		return util.ParseInt(line[1:], 10, 64)
   314  	case RespString:
   315  		s, err := r.readStringReply(line)
   316  		if err != nil {
   317  			return 0, err
   318  		}
   319  		return util.ParseInt([]byte(s), 10, 64)
   320  	case RespBigInt:
   321  		b, err := r.readBigInt(line)
   322  		if err != nil {
   323  			return 0, err
   324  		}
   325  		if !b.IsInt64() {
   326  			return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
   327  		}
   328  		return b.Int64(), nil
   329  	}
   330  	return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
   331  }
   332  
   333  func (r *Reader) ReadUint() (uint64, error) {
   334  	line, err := r.ReadLine()
   335  	if err != nil {
   336  		return 0, err
   337  	}
   338  	switch line[0] {
   339  	case RespInt, RespStatus:
   340  		return util.ParseUint(line[1:], 10, 64)
   341  	case RespString:
   342  		s, err := r.readStringReply(line)
   343  		if err != nil {
   344  			return 0, err
   345  		}
   346  		return util.ParseUint([]byte(s), 10, 64)
   347  	case RespBigInt:
   348  		b, err := r.readBigInt(line)
   349  		if err != nil {
   350  			return 0, err
   351  		}
   352  		if !b.IsUint64() {
   353  			return 0, fmt.Errorf("bigInt(%s) value out of range", b.String())
   354  		}
   355  		return b.Uint64(), nil
   356  	}
   357  	return 0, fmt.Errorf("redis: can't parse uint reply: %.100q", line)
   358  }
   359  
   360  func (r *Reader) ReadFloat() (float64, error) {
   361  	line, err := r.ReadLine()
   362  	if err != nil {
   363  		return 0, err
   364  	}
   365  	switch line[0] {
   366  	case RespFloat:
   367  		return r.readFloat(line)
   368  	case RespStatus:
   369  		return strconv.ParseFloat(string(line[1:]), 64)
   370  	case RespString:
   371  		s, err := r.readStringReply(line)
   372  		if err != nil {
   373  			return 0, err
   374  		}
   375  		return strconv.ParseFloat(s, 64)
   376  	}
   377  	return 0, fmt.Errorf("redis: can't parse float reply: %.100q", line)
   378  }
   379  
   380  func (r *Reader) ReadString() (string, error) {
   381  	line, err := r.ReadLine()
   382  	if err != nil {
   383  		return "", err
   384  	}
   385  
   386  	switch line[0] {
   387  	case RespStatus, RespInt, RespFloat:
   388  		return string(line[1:]), nil
   389  	case RespString:
   390  		return r.readStringReply(line)
   391  	case RespBool:
   392  		b, err := r.readBool(line)
   393  		return strconv.FormatBool(b), err
   394  	case RespVerbatim:
   395  		return r.readVerb(line)
   396  	case RespBigInt:
   397  		b, err := r.readBigInt(line)
   398  		if err != nil {
   399  			return "", err
   400  		}
   401  		return b.String(), nil
   402  	}
   403  	return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
   404  }
   405  
   406  func (r *Reader) ReadBool() (bool, error) {
   407  	s, err := r.ReadString()
   408  	if err != nil {
   409  		return false, err
   410  	}
   411  	return s == "OK" || s == "1" || s == "true", nil
   412  }
   413  
   414  func (r *Reader) ReadSlice() ([]interface{}, error) {
   415  	line, err := r.ReadLine()
   416  	if err != nil {
   417  		return nil, err
   418  	}
   419  	return r.readSlice(line)
   420  }
   421  
   422  // ReadFixedArrayLen read fixed array length.
   423  func (r *Reader) ReadFixedArrayLen(fixedLen int) error {
   424  	n, err := r.ReadArrayLen()
   425  	if err != nil {
   426  		return err
   427  	}
   428  	if n != fixedLen {
   429  		return fmt.Errorf("redis: got %d elements in the array, wanted %d", n, fixedLen)
   430  	}
   431  	return nil
   432  }
   433  
   434  // ReadArrayLen Read and return the length of the array.
   435  func (r *Reader) ReadArrayLen() (int, error) {
   436  	line, err := r.ReadLine()
   437  	if err != nil {
   438  		return 0, err
   439  	}
   440  	switch line[0] {
   441  	case RespArray, RespSet, RespPush:
   442  		return replyLen(line)
   443  	default:
   444  		return 0, fmt.Errorf("redis: can't parse array/set/push reply: %.100q", line)
   445  	}
   446  }
   447  
   448  // ReadFixedMapLen reads fixed map length.
   449  func (r *Reader) ReadFixedMapLen(fixedLen int) error {
   450  	n, err := r.ReadMapLen()
   451  	if err != nil {
   452  		return err
   453  	}
   454  	if n != fixedLen {
   455  		return fmt.Errorf("redis: got %d elements in the map, wanted %d", n, fixedLen)
   456  	}
   457  	return nil
   458  }
   459  
   460  // ReadMapLen reads the length of the map type.
   461  // If responding to the array type (RespArray/RespSet/RespPush),
   462  // it must be a multiple of 2 and return n/2.
   463  // Other types will return an error.
   464  func (r *Reader) ReadMapLen() (int, error) {
   465  	line, err := r.ReadLine()
   466  	if err != nil {
   467  		return 0, err
   468  	}
   469  	switch line[0] {
   470  	case RespMap:
   471  		return replyLen(line)
   472  	case RespArray, RespSet, RespPush:
   473  		// Some commands and RESP2 protocol may respond to array types.
   474  		n, err := replyLen(line)
   475  		if err != nil {
   476  			return 0, err
   477  		}
   478  		if n%2 != 0 {
   479  			return 0, fmt.Errorf("redis: the length of the array must be a multiple of 2, got: %d", n)
   480  		}
   481  		return n / 2, nil
   482  	default:
   483  		return 0, fmt.Errorf("redis: can't parse map reply: %.100q", line)
   484  	}
   485  }
   486  
   487  // DiscardNext read and discard the data represented by the next line.
   488  func (r *Reader) DiscardNext() error {
   489  	line, err := r.readLine()
   490  	if err != nil {
   491  		return err
   492  	}
   493  	return r.Discard(line)
   494  }
   495  
   496  // Discard the data represented by line.
   497  func (r *Reader) Discard(line []byte) (err error) {
   498  	if len(line) == 0 {
   499  		return errors.New("redis: invalid line")
   500  	}
   501  	switch line[0] {
   502  	case RespStatus, RespError, RespInt, RespNil, RespFloat, RespBool, RespBigInt:
   503  		return nil
   504  	}
   505  
   506  	n, err := replyLen(line)
   507  	if err != nil && err != Nil {
   508  		return err
   509  	}
   510  
   511  	switch line[0] {
   512  	case RespBlobError, RespString, RespVerbatim:
   513  		// +\r\n
   514  		_, err = r.rd.Discard(n + 2)
   515  		return err
   516  	case RespArray, RespSet, RespPush:
   517  		for i := 0; i < n; i++ {
   518  			if err = r.DiscardNext(); err != nil {
   519  				return err
   520  			}
   521  		}
   522  		return nil
   523  	case RespMap, RespAttr:
   524  		// Read key & value.
   525  		for i := 0; i < n*2; i++ {
   526  			if err = r.DiscardNext(); err != nil {
   527  				return err
   528  			}
   529  		}
   530  		return nil
   531  	}
   532  
   533  	return fmt.Errorf("redis: can't parse %.100q", line)
   534  }
   535  
   536  func replyLen(line []byte) (n int, err error) {
   537  	n, err = util.Atoi(line[1:])
   538  	if err != nil {
   539  		return 0, err
   540  	}
   541  
   542  	if n < -1 {
   543  		return 0, fmt.Errorf("redis: invalid reply: %q", line)
   544  	}
   545  
   546  	switch line[0] {
   547  	case RespString, RespVerbatim, RespBlobError,
   548  		RespArray, RespSet, RespPush, RespMap, RespAttr:
   549  		if n == -1 {
   550  			return 0, Nil
   551  		}
   552  	}
   553  	return n, nil
   554  }
   555  
   556  // IsNilReply detects redis.Nil of RESP2.
   557  func IsNilReply(line []byte) bool {
   558  	return len(line) == 3 &&
   559  		(line[0] == RespString || line[0] == RespArray) &&
   560  		line[1] == '-' && line[2] == '1'
   561  }
   562  

View as plain text