...

Source file src/github.com/redis/go-redis/v9/tx.go

Documentation: github.com/redis/go-redis/v9

     1  package redis
     2  
     3  import (
     4  	"context"
     5  
     6  	"github.com/redis/go-redis/v9/internal/pool"
     7  	"github.com/redis/go-redis/v9/internal/proto"
     8  )
     9  
    10  // TxFailedErr transaction redis failed.
    11  const TxFailedErr = proto.RedisError("redis: transaction failed")
    12  
    13  // Tx implements Redis transactions as described in
    14  // http://redis.io/topics/transactions. It's NOT safe for concurrent use
    15  // by multiple goroutines, because Exec resets list of watched keys.
    16  //
    17  // If you don't need WATCH, use Pipeline instead.
    18  type Tx struct {
    19  	baseClient
    20  	cmdable
    21  	statefulCmdable
    22  }
    23  
    24  func (c *Client) newTx() *Tx {
    25  	tx := Tx{
    26  		baseClient: baseClient{
    27  			opt:        c.opt,
    28  			connPool:   pool.NewStickyConnPool(c.connPool),
    29  			hooksMixin: c.hooksMixin.clone(),
    30  		},
    31  	}
    32  	tx.init()
    33  	return &tx
    34  }
    35  
    36  func (c *Tx) init() {
    37  	c.cmdable = c.Process
    38  	c.statefulCmdable = c.Process
    39  
    40  	c.initHooks(hooks{
    41  		dial:       c.baseClient.dial,
    42  		process:    c.baseClient.process,
    43  		pipeline:   c.baseClient.processPipeline,
    44  		txPipeline: c.baseClient.processTxPipeline,
    45  	})
    46  }
    47  
    48  func (c *Tx) Process(ctx context.Context, cmd Cmder) error {
    49  	err := c.processHook(ctx, cmd)
    50  	cmd.SetErr(err)
    51  	return err
    52  }
    53  
    54  // Watch prepares a transaction and marks the keys to be watched
    55  // for conditional execution if there are any keys.
    56  //
    57  // The transaction is automatically closed when fn exits.
    58  func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
    59  	tx := c.newTx()
    60  	defer tx.Close(ctx)
    61  	if len(keys) > 0 {
    62  		if err := tx.Watch(ctx, keys...).Err(); err != nil {
    63  			return err
    64  		}
    65  	}
    66  	return fn(tx)
    67  }
    68  
    69  // Close closes the transaction, releasing any open resources.
    70  func (c *Tx) Close(ctx context.Context) error {
    71  	_ = c.Unwatch(ctx).Err()
    72  	return c.baseClient.Close()
    73  }
    74  
    75  // Watch marks the keys to be watched for conditional execution
    76  // of a transaction.
    77  func (c *Tx) Watch(ctx context.Context, keys ...string) *StatusCmd {
    78  	args := make([]interface{}, 1+len(keys))
    79  	args[0] = "watch"
    80  	for i, key := range keys {
    81  		args[1+i] = key
    82  	}
    83  	cmd := NewStatusCmd(ctx, args...)
    84  	_ = c.Process(ctx, cmd)
    85  	return cmd
    86  }
    87  
    88  // Unwatch flushes all the previously watched keys for a transaction.
    89  func (c *Tx) Unwatch(ctx context.Context, keys ...string) *StatusCmd {
    90  	args := make([]interface{}, 1+len(keys))
    91  	args[0] = "unwatch"
    92  	for i, key := range keys {
    93  		args[1+i] = key
    94  	}
    95  	cmd := NewStatusCmd(ctx, args...)
    96  	_ = c.Process(ctx, cmd)
    97  	return cmd
    98  }
    99  
   100  // Pipeline creates a pipeline. Usually it is more convenient to use Pipelined.
   101  func (c *Tx) Pipeline() Pipeliner {
   102  	pipe := Pipeline{
   103  		exec: func(ctx context.Context, cmds []Cmder) error {
   104  			return c.processPipelineHook(ctx, cmds)
   105  		},
   106  	}
   107  	pipe.init()
   108  	return &pipe
   109  }
   110  
   111  // Pipelined executes commands queued in the fn outside of the transaction.
   112  // Use TxPipelined if you need transactional behavior.
   113  func (c *Tx) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
   114  	return c.Pipeline().Pipelined(ctx, fn)
   115  }
   116  
   117  // TxPipelined executes commands queued in the fn in the transaction.
   118  //
   119  // When using WATCH, EXEC will execute commands only if the watched keys
   120  // were not modified, allowing for a check-and-set mechanism.
   121  //
   122  // Exec always returns list of commands. If transaction fails
   123  // TxFailedErr is returned. Otherwise Exec returns an error of the first
   124  // failed command or nil.
   125  func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
   126  	return c.TxPipeline().Pipelined(ctx, fn)
   127  }
   128  
   129  // TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined.
   130  func (c *Tx) TxPipeline() Pipeliner {
   131  	pipe := Pipeline{
   132  		exec: func(ctx context.Context, cmds []Cmder) error {
   133  			cmds = wrapMultiExec(ctx, cmds)
   134  			return c.processTxPipelineHook(ctx, cmds)
   135  		},
   136  	}
   137  	pipe.init()
   138  	return &pipe
   139  }
   140  
   141  func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder {
   142  	if len(cmds) == 0 {
   143  		panic("not reached")
   144  	}
   145  	cmdsCopy := make([]Cmder, len(cmds)+2)
   146  	cmdsCopy[0] = NewStatusCmd(ctx, "multi")
   147  	copy(cmdsCopy[1:], cmds)
   148  	cmdsCopy[len(cmdsCopy)-1] = NewSliceCmd(ctx, "exec")
   149  	return cmdsCopy
   150  }
   151  

View as plain text