diff options
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/ring.go')
-rw-r--r-- | src/dma/vendor/github.com/go-redis/redis/ring.go | 106 |
1 files changed, 71 insertions, 35 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go index ef855115..250e5f64 100644 --- a/src/dma/vendor/github.com/go-redis/redis/ring.go +++ b/src/dma/vendor/github.com/go-redis/redis/ring.go @@ -68,6 +68,8 @@ type RingOptions struct { WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, @@ -315,12 +319,12 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ -// Ring is a Redis client that uses constistent hashing to distribute +// Ring is a Redis client that uses consistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from -// the ring. When shard comes online it is added back to the ring. This +// the ring. When a shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any @@ -338,6 +342,7 @@ type Ring struct { shards *ringShards cmdsInfoCache *cmdsInfoCache + process func(Cmder) error processPipeline func([]Cmder) error } @@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring { } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats { acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns } return &acc } @@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shards.GetByKey(firstKey) } -func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.ForEachShard(func(c *Client) error { - c.WrapProcess(fn) - return nil - }) +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { + c.process = fn(c.process) } func (c *Ring) Process(cmd Cmder) error { - shard, err := c.cmdShard(cmd) - if err != nil { - cmd.setErr(err) - return err + return c.process(cmd) +} + +func (c *Ring) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err + } + + err = shard.Client.Process(cmd) + if err == nil { + return nil + } + if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { + return err + } } - return shard.Client.Process(cmd) + return cmd.Err() } func (c *Ring) Pipeline() Pipeliner { @@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } + var mu sync.Mutex var failedCmdsMap map[string][]Cmder + var wg sync.WaitGroup for hash, cmds := range cmdsMap { - shard, err := c.shards.GetByHash(hash) - if err != nil { - setCmdsErr(cmds, err) - continue - } + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() + + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + return + } - cn, err := shard.Client.getConn() - if err != nil { - setCmdsErr(cmds, err) - continue - } + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + return + } - canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - if err == nil || internal.IsRedisError(err) { - shard.Client.connPool.Put(cn) - continue - } - shard.Client.connPool.Remove(cn) + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + shard.Client.releaseConnStrict(cn, err) - if canRetry && internal.IsRetryableError(err, true) { - if failedCmdsMap == nil { - failedCmdsMap = make(map[string][]Cmder) + if canRetry && internal.IsRetryableError(err, true) { + mu.Lock() + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[hash] = cmds + mu.Unlock() } - failedCmdsMap[hash] = cmds - } + }(hash, cmds) } + wg.Wait() if len(failedCmdsMap) == 0 { break } cmdsMap = failedCmdsMap } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { |