aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/ring.go')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/ring.go106
1 files changed, 71 insertions, 35 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go
index ef855115..250e5f64 100644
--- a/src/dma/vendor/github.com/go-redis/redis/ring.go
+++ b/src/dma/vendor/github.com/go-redis/redis/ring.go
@@ -68,6 +68,8 @@ type RingOptions struct {
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
@@ -315,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
-// Ring is a Redis client that uses constistent hashing to distribute
+// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
-// the ring. When shard comes online it is added back to the ring. This
+// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
@@ -338,6 +342,7 @@ type Ring struct {
shards *ringShards
cmdsInfoCache *cmdsInfoCache
+ process func(Cmder) error
processPipeline func([]Cmder) error
}
@@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+ ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
@@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats {
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
}
return &acc
}
@@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shards.GetByKey(firstKey)
}
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.ForEachShard(func(c *Client) error {
- c.WrapProcess(fn)
- return nil
- })
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
+func (c *Ring) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
+ c.process = fn(c.process)
}
func (c *Ring) Process(cmd Cmder) error {
- shard, err := c.cmdShard(cmd)
- if err != nil {
- cmd.setErr(err)
- return err
+ return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ shard, err := c.cmdShard(cmd)
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ err = shard.Client.Process(cmd)
+ if err == nil {
+ return nil
+ }
+ if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+ return err
+ }
}
- return shard.Client.Process(cmd)
+ return cmd.Err()
}
func (c *Ring) Pipeline() Pipeliner {
@@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
+ var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
+ var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
- shard, err := c.shards.GetByHash(hash)
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ wg.Add(1)
+ go func(hash string, cmds []Cmder) {
+ defer wg.Done()
+
+ shard, err := c.shards.GetByHash(hash)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- cn, err := shard.Client.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ cn, err := shard.Client.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- shard.Client.connPool.Put(cn)
- continue
- }
- shard.Client.connPool.Remove(cn)
+ canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
+ shard.Client.releaseConnStrict(cn, err)
- if canRetry && internal.IsRetryableError(err, true) {
- if failedCmdsMap == nil {
- failedCmdsMap = make(map[string][]Cmder)
+ if canRetry && internal.IsRetryableError(err, true) {
+ mu.Lock()
+ if failedCmdsMap == nil {
+ failedCmdsMap = make(map[string][]Cmder)
+ }
+ failedCmdsMap[hash] = cmds
+ mu.Unlock()
}
- failedCmdsMap[hash] = cmds
- }
+ }(hash, cmds)
}
+ wg.Wait()
if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *Ring) TxPipeline() Pipeliner {