...

Source file src/github.com/redis/go-redis/v9/internal/pool/conn.go

Documentation: github.com/redis/go-redis/v9/internal/pool

     1  package pool
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"net"
     7  	"sync/atomic"
     8  	"time"
     9  
    10  	"github.com/redis/go-redis/v9/internal/proto"
    11  )
    12  
    13  var noDeadline = time.Time{}
    14  
    15  type Conn struct {
    16  	usedAt  int64 // atomic
    17  	netConn net.Conn
    18  
    19  	rd *proto.Reader
    20  	bw *bufio.Writer
    21  	wr *proto.Writer
    22  
    23  	Inited    bool
    24  	pooled    bool
    25  	createdAt time.Time
    26  
    27  	onClose func() error
    28  }
    29  
    30  func NewConn(netConn net.Conn) *Conn {
    31  	return NewConnWithBufferSize(netConn, proto.DefaultBufferSize, proto.DefaultBufferSize)
    32  }
    33  
    34  func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn {
    35  	cn := &Conn{
    36  		netConn:   netConn,
    37  		createdAt: time.Now(),
    38  	}
    39  
    40  	// Use specified buffer sizes, or fall back to 32KiB defaults if 0
    41  	if readBufSize > 0 {
    42  		cn.rd = proto.NewReaderSize(netConn, readBufSize)
    43  	} else {
    44  		cn.rd = proto.NewReader(netConn) // Uses 32KiB default
    45  	}
    46  
    47  	if writeBufSize > 0 {
    48  		cn.bw = bufio.NewWriterSize(netConn, writeBufSize)
    49  	} else {
    50  		cn.bw = bufio.NewWriterSize(netConn, proto.DefaultBufferSize)
    51  	}
    52  
    53  	cn.wr = proto.NewWriter(cn.bw)
    54  	cn.SetUsedAt(time.Now())
    55  	return cn
    56  }
    57  
    58  func (cn *Conn) UsedAt() time.Time {
    59  	unix := atomic.LoadInt64(&cn.usedAt)
    60  	return time.Unix(unix, 0)
    61  }
    62  
    63  func (cn *Conn) SetUsedAt(tm time.Time) {
    64  	atomic.StoreInt64(&cn.usedAt, tm.Unix())
    65  }
    66  
    67  func (cn *Conn) SetOnClose(fn func() error) {
    68  	cn.onClose = fn
    69  }
    70  
    71  func (cn *Conn) SetNetConn(netConn net.Conn) {
    72  	cn.netConn = netConn
    73  	cn.rd.Reset(netConn)
    74  	cn.bw.Reset(netConn)
    75  }
    76  
    77  func (cn *Conn) Write(b []byte) (int, error) {
    78  	return cn.netConn.Write(b)
    79  }
    80  
    81  func (cn *Conn) RemoteAddr() net.Addr {
    82  	if cn.netConn != nil {
    83  		return cn.netConn.RemoteAddr()
    84  	}
    85  	return nil
    86  }
    87  
    88  func (cn *Conn) WithReader(
    89  	ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
    90  ) error {
    91  	if timeout >= 0 {
    92  		if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
    93  			return err
    94  		}
    95  	}
    96  	return fn(cn.rd)
    97  }
    98  
    99  func (cn *Conn) WithWriter(
   100  	ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
   101  ) error {
   102  	if timeout >= 0 {
   103  		if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
   104  			return err
   105  		}
   106  	}
   107  
   108  	if cn.bw.Buffered() > 0 {
   109  		cn.bw.Reset(cn.netConn)
   110  	}
   111  
   112  	if err := fn(cn.wr); err != nil {
   113  		return err
   114  	}
   115  
   116  	return cn.bw.Flush()
   117  }
   118  
   119  func (cn *Conn) Close() error {
   120  	if cn.onClose != nil {
   121  		// ignore error
   122  		_ = cn.onClose()
   123  	}
   124  	return cn.netConn.Close()
   125  }
   126  
   127  func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
   128  	tm := time.Now()
   129  	cn.SetUsedAt(tm)
   130  
   131  	if timeout > 0 {
   132  		tm = tm.Add(timeout)
   133  	}
   134  
   135  	if ctx != nil {
   136  		deadline, ok := ctx.Deadline()
   137  		if ok {
   138  			if timeout == 0 {
   139  				return deadline
   140  			}
   141  			if deadline.Before(tm) {
   142  				return deadline
   143  			}
   144  			return tm
   145  		}
   146  	}
   147  
   148  	if timeout > 0 {
   149  		return tm
   150  	}
   151  
   152  	return noDeadline
   153  }
   154  

View as plain text