aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis
diff options
context:
space:
mode:
authorTomofumi Hayashi <tohayash@redhat.com>2019-04-27 20:38:39 +0900
committerTomofumi Hayashi <tohayash@redhat.com>2019-04-27 20:40:37 +0900
commit4d11ca17d0f73f5bd783f45900118295fdfed46b (patch)
treecce8575b02ac850d2b30ec12a5c4083c48e85c6c /src/dma/vendor/github.com/go-redis
parent07e4a96e4996f3d39b92dd601b3ed0d23bfbaa0c (diff)
barometer: update DMA's vendoring packages
Change-Id: I0578b094f1ecdaed20c906be2ba29d51b8089d7c Signed-off-by: Tomofumi Hayashi <tohayash@redhat.com>
Diffstat (limited to 'src/dma/vendor/github.com/go-redis')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/.travis.yml3
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md13
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/Makefile4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/cluster.go499
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/command.go993
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/commands.go380
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/error.go30
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go51
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go132
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go162
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go113
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go159
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go64
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go10
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/options.go27
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/parser.go394
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/pipeline.go7
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/pubsub.go127
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/redis.go173
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/result.go2
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/ring.go106
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/sentinel.go252
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/tx.go4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/universal.go102
25 files changed, 2338 insertions, 1473 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/.travis.yml b/src/dma/vendor/github.com/go-redis/redis/.travis.yml
index 39ffc2be..6b110b4c 100644
--- a/src/dma/vendor/github.com/go-redis/redis/.travis.yml
+++ b/src/dma/vendor/github.com/go-redis/redis/.travis.yml
@@ -5,10 +5,9 @@ services:
- redis-server
go:
- - 1.7.x
- - 1.8.x
- 1.9.x
- 1.10.x
+ - 1.11.x
- tip
matrix:
diff --git a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
index cb0e1b8e..19645661 100644
--- a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
+++ b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
@@ -1,5 +1,18 @@
# Changelog
+## Unreleased
+
+- Cluster and Ring pipelines process commands for each node in its own goroutine.
+
+## 6.14
+
+- Added Options.MinIdleConns.
+- Added Options.MaxConnAge.
+- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
+- Add Client.Do to simplify creating custom commands.
+- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers.
+- Lower memory usage.
+
## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.
diff --git a/src/dma/vendor/github.com/go-redis/redis/Makefile b/src/dma/vendor/github.com/go-redis/redis/Makefile
index 1fbdac91..fa3b4e00 100644
--- a/src/dma/vendor/github.com/go-redis/redis/Makefile
+++ b/src/dma/vendor/github.com/go-redis/redis/Makefile
@@ -3,6 +3,8 @@ all: testdeps
go test ./... -short -race
env GOOS=linux GOARCH=386 go test ./...
go vet
+ go get github.com/gordonklaus/ineffassign
+ ineffassign .
testdeps: testdata/redis/src/redis-server
@@ -13,7 +15,7 @@ bench: testdeps
testdata/redis:
mkdir -p $@
- wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
+ wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile
diff --git a/src/dma/vendor/github.com/go-redis/redis/cluster.go b/src/dma/vendor/github.com/go-redis/redis/cluster.go
index 7a1af143..0cecc62c 100644
--- a/src/dma/vendor/github.com/go-redis/redis/cluster.go
+++ b/src/dma/vendor/github.com/go-redis/redis/cluster.go
@@ -3,11 +3,11 @@ package redis
import (
"context"
"crypto/tls"
- "errors"
"fmt"
"math"
"math/rand"
"net"
+ "runtime"
"sort"
"sync"
"sync/atomic"
@@ -17,7 +17,6 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@@ -49,14 +48,18 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
+ // Optional hook that is called when a new node is created.
+ OnNewNode func(*Client)
+
// Following options are copied from Options struct.
OnConnect func(*Conn) error
+ Password string
+
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
- Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -64,6 +67,8 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
- if opt.RouteByLatency || opt.RouteRandomly {
+ if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
opt.ReadOnly = true
}
+ if opt.PoolSize == 0 {
+ opt.PoolSize = 5 * runtime.NumCPU()
+ }
+
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
@@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
- PoolSize: opt.PoolSize,
- PoolTimeout: opt.PoolTimeout,
- IdleTimeout: opt.IdleTimeout,
-
+ PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
+ PoolTimeout: opt.PoolTimeout,
+ IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
TLSConfig: opt.TLSConfig,
@@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
+ if clOpt.OnNewNode != nil {
+ clOpt.OnNewNode(node.Client)
+ }
+
return &node
}
@@ -228,8 +242,6 @@ type clusterNodes struct {
clusterAddrs []string
closed bool
- nodeCreateGroup singleflight.Group
-
_generation uint32 // atomic
}
@@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
- v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
- node := newClusterNode(c.opt, addr)
- return node, nil
- })
-
c.mu.Lock()
defer c.mu.Unlock()
@@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr]
if ok {
- _ = v.(*clusterNode).Close()
return node, err
}
- node = v.(*clusterNode)
+
+ node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
- if err == nil {
- c.clusterAddrs = append(c.clusterAddrs, addr)
- }
+ c.clusterAddrs = append(c.clusterAddrs, addr)
c.allNodes[addr] = node
return node, err
@@ -429,13 +434,15 @@ func newClusterState(
createdAt: time.Now(),
}
- isLoopbackOrigin := isLoopbackAddr(origin)
+ originHost, _, _ := net.SplitHostPort(origin)
+ isLoopbackOrigin := isLoopback(originHost)
+
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
- if !isLoopbackOrigin && useOriginAddr(origin, addr) {
- addr = origin
+ if !isLoopbackOrigin {
+ addr = replaceLoopbackHost(addr, originHost)
}
node, err := c.nodes.GetOrCreate(addr)
@@ -469,6 +476,33 @@ func newClusterState(
return &c, nil
}
+func replaceLoopbackHost(nodeAddr, originHost string) string {
+ nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+ if err != nil {
+ return nodeAddr
+ }
+
+ nodeIP := net.ParseIP(nodeHost)
+ if nodeIP == nil {
+ return nodeAddr
+ }
+
+ if !nodeIP.IsLoopback() {
+ return nodeAddr
+ }
+
+ // Use origin host which is not loopback and node port.
+ return net.JoinHostPort(originHost, nodePort)
+}
+
+func isLoopback(host string) bool {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return true
+ }
+ return ip.IsLoopback()
+}
+
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
@@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
- break
+ return slave, nil
}
}
- return slave, nil
+
+ // All slaves are loading - use master.
+ return nodes[0], nil
}
}
@@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
-func (c *clusterState) IsConsistent() bool {
- if c.nodes.opt.ClusterSlots != nil {
- return true
- }
- return len(c.Masters) <= len(c.Slaves)
-}
-
//------------------------------------------------------------------------------
type clusterStateHolder struct {
load func() (*clusterState, error)
- state atomic.Value
-
- firstErrMu sync.RWMutex
- firstErr error
-
+ state atomic.Value
reloading uint32 // atomic
}
@@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
func (c *clusterStateHolder) Reload() (*clusterState, error) {
- state, err := c.reload()
- if err != nil {
- return nil, err
- }
- if !state.IsConsistent() {
- time.AfterFunc(time.Second, c.LazyReload)
- }
- return state, nil
-}
-
-func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
- c.firstErrMu.Lock()
- if c.firstErr == nil {
- c.firstErr = err
- }
- c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- for {
- state, err := c.reload()
- if err != nil {
- return
- }
- time.Sleep(100 * time.Millisecond)
- if state.IsConsistent() {
- return
- }
+ _, err := c.Reload()
+ if err != nil {
+ return
}
+ time.Sleep(100 * time.Millisecond)
}()
}
@@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
}
return state, nil
}
-
- c.firstErrMu.RLock()
- err := c.firstErr
- c.firstErrMu.RUnlock()
- if err != nil {
- return nil, err
- }
-
- return nil, errors.New("redis: cluster has no state")
+ return c.Reload()
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline
c.init()
-
- _, _ = c.state.Reload()
- _, _ = c.cmdsInfoCache.Get()
-
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
-// ReloadState reloads cluster state. It calls ClusterSlots func
+func (c *ClusterClient) init() {
+ c.cmdable.setProcessor(c.Process)
+}
+
+// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
-func (c *ClusterClient) init() {
- c.cmdable.setProcessor(c.Process)
-}
-
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int {
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+ args := cmd.Args()
+ if args[0] == "cluster" && args[1] == "getkeysinslot" {
+ return args[2].(int)
+ }
+
cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
@@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
}
cmdInfo := c.cmdInfo(cmd.Name())
- slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
+ slot := c.cmdSlot(cmd)
- if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
+ if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
@@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil {
break
}
-
- if internal.IsRetryableError(err, true) {
+ if err != Nil {
c.state.LazyReload()
- continue
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node, err = c.slotMasterNode(slot)
if err != nil {
return err
@@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
+ if internal.IsRetryableError(err, true) {
+ continue
+ }
+
return err
}
@@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *ClusterClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
@@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil {
break
}
+ if err != Nil {
+ c.state.LazyReload()
+ }
- // If slave is loading - read from master.
+ // If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
- continue
- }
-
- if internal.IsRetryableError(err, true) {
- c.state.LazyReload()
-
- // First retry the same node.
- if attempt == 0 {
- continue
- }
-
- // Second try random node.
- node, err = c.nodes.Random()
- if err != nil {
- break
- }
+ node = nil
continue
}
@@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
-
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
@@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue
}
+ if internal.IsRetryableError(err, true) {
+ // First retry the same node.
+ if attempt == 0 {
+ continue
+ }
+
+ // Second try random node.
+ node, err = c.nodes.Random()
+ if err != nil {
+ break
+ }
+ continue
+ }
+
break
}
@@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline(
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
- cmdsMap, err := c.mapCmdsByNode(cmds)
+ cmdsMap := newCmdsMap()
+ err := c.mapCmdsByNode(cmds, cmdsMap)
if err != nil {
setCmdsErr(cmds, err)
return err
@@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
- for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ for node, cmds := range cmdsMap.m {
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
-func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
+type cmdsMap struct {
+ mu sync.Mutex
+ m map[*clusterNode][]Cmder
+}
+
+func newCmdsMap() *cmdsMap {
+ return &cmdsMap{
+ m: make(map[*clusterNode][]Cmder),
+ }
+}
+
+func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
- return nil, err
+ return err
}
- cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
var node *clusterNode
@@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
node, err = state.slotMasterNode(slot)
}
if err != nil {
- return nil, err
+ return err
}
- cmdsMap[node] = append(cmdsMap[node], cmd)
+ cmdsMap.mu.Lock()
+ cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
+ cmdsMap.mu.Unlock()
}
- return cmdsMap, nil
+ return nil
}
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true
}
-func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
- remappedCmds, err := c.mapCmdsByNode(cmds)
- if err != nil {
- setCmdsErr(cmds, err)
- return
- }
-
- for node, cmds := range remappedCmds {
- failedCmds[node] = cmds
- }
-}
-
func (c *ClusterClient) pipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
-
- err := writeCmd(cn, cmds...)
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
if err != nil {
setCmdsErr(cmds, err)
- failedCmds[node] = cmds
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- return c.pipelineReadCmds(cn, cmds, failedCmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return c.pipelineReadCmds(node, rd, cmds, failedCmds)
+ })
+ return err
}
func (c *ClusterClient) pipelineReadCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
+ var firstErr error
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err == nil {
continue
}
@@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}
- return err
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
+ if firstErr == nil {
+ firstErr = err
+ }
}
- return nil
+ return firstErr
}
func (c *ClusterClient) checkMovedErr(
- cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+ cmd Cmder, err error, failedCmds *cmdsMap,
) bool {
moved, ask, addr := internal.IsMovedError(err)
@@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
- cmdsMap = failedCmds
+ cmdsMap = failedCmds.m
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
@@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
func (c *ClusterClient) txPipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- failedCmds[node] = cmds
- return err
- }
-
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- return pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := c.txPipelineReadQueued(rd, cmds, failedCmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return err
}
func (c *ClusterClient) txPipelineReadQueued(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
- err := statusCmd.readReply(cn)
+ err := statusCmd.readReply(rd)
if err == nil {
continue
}
@@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued(
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}
-func (c *ClusterClient) pubSub(channels []string) *PubSub {
+func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
- if node == nil {
- var slot int
- if len(channels) > 0 {
- slot = hashtag.Slot(channels[0])
- } else {
- slot = -1
- }
+ if node != nil {
+ panic("node != nil")
+ }
- masterNode, err := c.slotMasterNode(slot)
- if err != nil {
- return nil, err
- }
- node = masterNode
+ slot := hashtag.Slot(channels[0])
+
+ var err error
+ node, err = c.slotMasterNode(slot)
+ if err != nil {
+ return nil, err
}
- return node.Client.newConn()
+
+ cn, err := node.Client.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ return cn, nil
},
closeConn: func(cn *pool.Conn) error {
- return node.Client.connPool.CloseConn(cn)
+ err := node.Client.connPool.CloseConn(cn)
+ node = nil
+ return err
},
}
pubsub.init()
+
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
@@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
-func useOriginAddr(originAddr, nodeAddr string) bool {
- nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
- if err != nil {
- return false
- }
-
- nodeIP := net.ParseIP(nodeHost)
- if nodeIP == nil {
- return false
- }
-
- if !nodeIP.IsLoopback() {
- return false
- }
-
- _, originPort, err := net.SplitHostPort(originAddr)
- if err != nil {
- return false
- }
-
- return nodePort == originPort
-}
-
-func isLoopbackAddr(addr string) bool {
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return false
- }
-
- ip := net.ParseIP(host)
- if ip == nil {
- return false
- }
-
- return ip.IsLoopback()
-}
-
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {
diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go
index 11472bec..cb4f94b1 100644
--- a/src/dma/vendor/github.com/go-redis/redis/command.go
+++ b/src/dma/vendor/github.com/go-redis/redis/command.go
@@ -1,16 +1,14 @@
package redis
import (
- "bytes"
"fmt"
+ "net"
"strconv"
"strings"
"time"
"github.com/go-redis/redis/internal"
- "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/util"
)
type Cmder interface {
@@ -18,13 +16,12 @@ type Cmder interface {
Args() []interface{}
stringArg(int) string
- readReply(*pool.Conn) error
+ readReply(rd *proto.Reader) error
setErr(error)
readTimeout() *time.Duration
Err() error
- fmt.Stringer
}
func setCmdsErr(cmds []Cmder, e error) {
@@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) {
}
}
-func firstCmdsErr(cmds []Cmder) error {
+func cmdsFirstErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
@@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error {
return nil
}
-func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
- cn.Wb.Reset()
+func writeCmd(wr *proto.Writer, cmds ...Cmder) error {
for _, cmd := range cmds {
- if err := cn.Wb.Append(cmd.Args()); err != nil {
+ err := wr.WriteArgs(cmd.Args())
+ if err != nil {
return err
}
}
-
- _, err := cn.Write(cn.Wb.Bytes())
- return err
+ return nil
}
func cmdString(cmd Cmder, val interface{}) string {
@@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) {
return cmd.val, cmd.err
}
-func (cmd *Cmd) String() string {
- return cmdString(cmd, cmd.val)
+func (cmd *Cmd) String() (string, error) {
+ if cmd.err != nil {
+ return "", cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case string:
+ return val, nil
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for String", val)
+ return "", err
+ }
}
-func (cmd *Cmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
+func (cmd *Cmd) Int() (int, error) {
if cmd.err != nil {
- return cmd.err
+ return 0, cmd.err
}
- if b, ok := cmd.val.([]byte); ok {
- // Bytes must be copied, because underlying memory is reused.
- cmd.val = string(b)
+ switch val := cmd.val.(type) {
+ case int64:
+ return int(val), nil
+ case string:
+ return strconv.Atoi(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int", val)
+ return 0, err
}
- return nil
+}
+
+func (cmd *Cmd) Int64() (int64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val, nil
+ case string:
+ return strconv.ParseInt(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Uint64() (uint64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return uint64(val), nil
+ case string:
+ return strconv.ParseUint(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Uint64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Float64() (float64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return float64(val), nil
+ case string:
+ return strconv.ParseFloat(val, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Float64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Bool() (bool, error) {
+ if cmd.err != nil {
+ return false, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val != 0, nil
+ case string:
+ return strconv.ParseBool(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Bool", val)
+ return false, err
+ }
+}
+
+func (cmd *Cmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadReply(sliceParser)
+ return cmd.err
+}
+
+// Implements proto.MultiBulkParse
+func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ vals := make([]interface{}, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(sliceParser)
+ if err != nil {
+ if err == Nil {
+ vals = append(vals, nil)
+ continue
+ }
+ if err, ok := err.(proto.RedisError); ok {
+ vals = append(vals, err)
+ continue
+ }
+ return nil, err
+ }
+
+ switch v := v.(type) {
+ case string:
+ vals = append(vals, v)
+ default:
+ vals = append(vals, v)
+ }
+ }
+ return vals, nil
}
//------------------------------------------------------------------------------
@@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
+ v, cmd.err = rd.ReadArrayReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadStringReply()
+func (cmd *StatusCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *IntCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadIntReply()
+func (cmd *IntCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadIntReply()
return cmd.err
}
@@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
+func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
var n int64
- n, cmd.err = cn.Rd.ReadIntReply()
+ n, cmd.err = rd.ReadIntReply()
if cmd.err != nil {
return cmd.err
}
@@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
+func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
+ v, cmd.err = rd.ReadArrayReply(timeParser)
if cmd.err != nil {
return cmd.err
}
@@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d elements, expected 2", n)
+ }
+
+ sec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ microsec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ return time.Unix(sec, microsec*1000), nil
+}
+
//------------------------------------------------------------------------------
type BoolCmd struct {
@@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string {
return cmdString(cmd, cmd.val)
}
-var ok = []byte("OK")
-
-func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadReply(nil)
+ v, cmd.err = rd.ReadReply(nil)
// `SET key value NX` returns nil when key already exists. But
// `SETNX key value` returns bool (0/1). So convert nil to bool.
// TODO: is this okay?
@@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
case int64:
cmd.val = v == 1
return nil
- case []byte:
- cmd.val = bytes.Equal(v, ok)
+ case string:
+ cmd.val = v == "OK"
return nil
default:
cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
@@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
type StringCmd struct {
baseCmd
- val []byte
+ val string
}
var _ Cmder = (*StringCmd)(nil)
@@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd {
}
func (cmd *StringCmd) Val() string {
- return util.BytesToString(cmd.val)
+ return cmd.val
}
func (cmd *StringCmd) Result() (string, error) {
@@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) {
}
func (cmd *StringCmd) Bytes() ([]byte, error) {
- return cmd.val, cmd.err
+ return []byte(cmd.val), cmd.err
+}
+
+func (cmd *StringCmd) Int() (int, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ return strconv.Atoi(cmd.Val())
}
func (cmd *StringCmd) Int64() (int64, error) {
@@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error {
if cmd.err != nil {
return cmd.err
}
- return proto.Scan(cmd.val, val)
+ return proto.Scan([]byte(cmd.val), val)
}
func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadBytesReply()
+func (cmd *StringCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadFloatReply()
+func (cmd *FloatCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadFloatReply()
return cmd.err
}
@@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
return proto.ScanSlice(cmd.Val(), container)
}
-func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
+ v, cmd.err = rd.ReadArrayReply(stringSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ss := make([]string, 0, n)
+ for i := int64(0); i < n; i++ {
+ s, err := rd.ReadString()
+ if err == Nil {
+ ss = append(ss, "")
+ } else if err != nil {
+ return nil, err
+ } else {
+ ss = append(ss, s)
+ }
+ }
+ return ss, nil
+}
+
//------------------------------------------------------------------------------
type BoolSliceCmd struct {
@@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
+ v, cmd.err = rd.ReadArrayReply(boolSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ bools := make([]bool, 0, n)
+ for i := int64(0); i < n; i++ {
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ bools = append(bools, n == 1)
+ }
+ return bools, nil
+}
+
//------------------------------------------------------------------------------
type StringStringMapCmd struct {
@@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStringMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]string, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringIntMapCmd struct {
@@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringIntMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]int64, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = n
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringStructMapCmd struct {
@@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStructMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
return nil
}
-//------------------------------------------------------------------------------
+// Implements proto.MultiBulkParse
+func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]struct{}, n)
+ for i := int64(0); i < n; i++ {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
-type XStream struct {
- Stream string
- Messages []*XMessage
+ m[key] = struct{}{}
+ }
+ return m, nil
}
+//------------------------------------------------------------------------------
+
type XMessage struct {
ID string
Values map[string]interface{}
}
+type XMessageSliceCmd struct {
+ baseCmd
+
+ val []XMessage
+}
+
+var _ Cmder = (*XMessageSliceCmd)(nil)
+
+func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
+ return &XMessageSliceCmd{
+ baseCmd: baseCmd{_args: args},
+ }
+}
+
+func (cmd *XMessageSliceCmd) Val() []XMessage {
+ return cmd.val
+}
+
+func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XMessageSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]XMessage)
+ return nil
+}
+
+// Implements proto.MultiBulkParse
+func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ msgs := make([]XMessage, 0, n)
+ for i := int64(0); i < n; i++ {
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(stringInterfaceMapParser)
+ if err != nil {
+ return nil, err
+ }
+
+ msgs = append(msgs, XMessage{
+ ID: id,
+ Values: v.(map[string]interface{}),
+ })
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return msgs, nil
+}
+
+// Implements proto.MultiBulkParse
+func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]interface{}, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
+type XStream struct {
+ Stream string
+ Messages []XMessage
+}
+
type XStreamSliceCmd struct {
baseCmd
- val []*XStream
+ val []XStream
}
var _ Cmder = (*XStreamSliceCmd)(nil)
@@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
}
}
-func (cmd *XStreamSliceCmd) Val() []*XStream {
+func (cmd *XStreamSliceCmd) Val() []XStream {
return cmd.val
}
-func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
+func (cmd *XStreamSliceCmd) Result() ([]XStream, error) {
return cmd.val, cmd.err
}
@@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
+ v, cmd.err = rd.ReadArrayReply(xStreamSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XStream)
+ cmd.val = v.([]XStream)
return nil
}
// Implements proto.MultiBulkParse
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- xx := make([]*XStream, n)
+ ret := make([]XStream, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xStreamParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ stream, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(xMessageSliceParser)
+ if err != nil {
+ return nil, err
+ }
+
+ ret = append(ret, XStream{
+ Stream: stream,
+ Messages: v.([]XMessage),
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- xx[i] = v.(*XStream)
}
- return xx, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
- if n != 2 {
- return nil, fmt.Errorf("got %d, wanted 2", n)
+//------------------------------------------------------------------------------
+
+type XPending struct {
+ Count int64
+ Lower string
+ Higher string
+ Consumers map[string]int64
+}
+
+type XPendingCmd struct {
+ baseCmd
+ val *XPending
+}
+
+var _ Cmder = (*XPendingCmd)(nil)
+
+func NewXPendingCmd(args ...interface{}) *XPendingCmd {
+ return &XPendingCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *XPendingCmd) Val() *XPending {
+ return cmd.val
+}
+
+func (cmd *XPendingCmd) Result() (*XPending, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XPendingCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = info.(*XPending)
+ return nil
+}
- stream, err := rd.ReadStringReply()
+func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ count, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
- v, err := rd.ReadArrayReply(xMessageSliceParser)
- if err != nil {
+ lower, err := rd.ReadString()
+ if err != nil && err != Nil {
return nil, err
}
- return &XStream{
- Stream: stream,
- Messages: v.([]*XMessage),
- }, nil
+ higher, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ pending := &XPending{
+ Count: count,
+ Lower: lower,
+ Higher: higher,
+ }
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ for i := int64(0); i < n; i++ {
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ consumerName, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumerPending, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ if pending.Consumers == nil {
+ pending.Consumers = make(map[string]int64)
+ }
+ pending.Consumers[consumerName] = consumerPending
+
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return nil, nil
+ })
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ return pending, nil
}
//------------------------------------------------------------------------------
-type XMessageSliceCmd struct {
- baseCmd
+type XPendingExt struct {
+ Id string
+ Consumer string
+ Idle time.Duration
+ RetryCount int64
+}
- val []*XMessage
+type XPendingExtCmd struct {
+ baseCmd
+ val []XPendingExt
}
-var _ Cmder = (*XMessageSliceCmd)(nil)
+var _ Cmder = (*XPendingExtCmd)(nil)
-func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
- return &XMessageSliceCmd{
+func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd {
+ return &XPendingExtCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *XMessageSliceCmd) Val() []*XMessage {
+func (cmd *XPendingExtCmd) Val() []XPendingExt {
return cmd.val
}
-func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
+func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) {
return cmd.val, cmd.err
}
-func (cmd *XMessageSliceCmd) String() string {
+func (cmd *XPendingExtCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
- var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
+func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XMessage)
+ cmd.val = info.([]XPendingExt)
return nil
}
-// Implements proto.MultiBulkParse
-func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- msgs := make([]*XMessage, n)
+func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ret := make([]XPendingExt, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xMessageParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumer, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ idle, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ retryCount, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ ret = append(ret, XPendingExt{
+ Id: id,
+ Consumer: consumer,
+ Idle: time.Duration(idle) * time.Millisecond,
+ RetryCount: retryCount,
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- msgs[i] = v.(*XMessage)
}
- return msgs, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
- id, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
+//------------------------------------------------------------------------------
- v, err := rd.ReadArrayReply(xKeyValueParser)
- if err != nil {
- return nil, err
+//------------------------------------------------------------------------------
+
+type ZSliceCmd struct {
+ baseCmd
+
+ val []Z
+}
+
+var _ Cmder = (*ZSliceCmd)(nil)
+
+func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
+ return &ZSliceCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *ZSliceCmd) Val() []Z {
+ return cmd.val
+}
- return &XMessage{
- ID: id,
- Values: v.(map[string]interface{}),
- }, nil
+func (cmd *ZSliceCmd) Result() ([]Z, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *ZSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(zSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]Z)
+ return nil
}
// Implements proto.MultiBulkParse
-func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
- values := make(map[string]interface{}, n)
+func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
+ var err error
+
+ z := &zz[i/2]
+
+ z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
- value, err := rd.ReadStringReply()
+ z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
-
- values[key] = value
}
- return values, nil
+ return zz, nil
}
//------------------------------------------------------------------------------
-type ZSliceCmd struct {
+type ZWithKeyCmd struct {
baseCmd
- val []Z
+ val ZWithKey
}
-var _ Cmder = (*ZSliceCmd)(nil)
+var _ Cmder = (*ZWithKeyCmd)(nil)
-func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
- return &ZSliceCmd{
+func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
+ return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *ZSliceCmd) Val() []Z {
+func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
-func (cmd *ZSliceCmd) Result() ([]Z, error) {
- return cmd.val, cmd.err
+func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
+ return cmd.Val(), cmd.Err()
}
-func (cmd *ZSliceCmd) String() string {
+func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
+ v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]Z)
+ cmd.val = v.(ZWithKey)
return nil
}
+// Implements proto.MultiBulkParse
+func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 3 {
+ return nil, fmt.Errorf("got %d elements, expected 3", n)
+ }
+
+ var z ZWithKey
+ var err error
+
+ z.Key, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Member, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Score, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ return z, nil
+}
+
//------------------------------------------------------------------------------
type ScanCmd struct {
@@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.page)
}
-func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
- cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
+func (cmd *ScanCmd) readReply(rd *proto.Reader) error {
+ cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
return cmd.err
}
@@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
+func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
+ v, cmd.err = rd.ReadArrayReply(clusterSlotsParser)
if cmd.err != nil {
return cmd.err
}
@@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
+ slots := make([]ClusterSlot, n)
+ for i := 0; i < len(slots); i++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n < 2 {
+ err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
+ return nil, err
+ }
+
+ start, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ end, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes := make([]ClusterNode, n-2)
+ for j := 0; j < len(nodes); j++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 && n != 3 {
+ err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
+ return nil, err
+ }
+
+ ip, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ port, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes[j].Addr = net.JoinHostPort(ip, port)
+
+ if n == 3 {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ nodes[j].Id = id
+ }
+ }
+
+ slots[i] = ClusterSlot{
+ Start: int(start),
+ End: int(end),
+ Nodes: nodes,
+ }
+ }
+ return slots, nil
+}
+
//------------------------------------------------------------------------------
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
-func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
+ v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
if cmd.err != nil {
return cmd.err
}
@@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
return nil
}
+func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ var loc GeoLocation
+ var err error
+
+ loc.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ if q.WithDist {
+ loc.Dist, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithGeoHash {
+ loc.GeoHash, err = rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithCoord {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 {
+ return nil, fmt.Errorf("got %d coordinates, expected 2", n)
+ }
+
+ loc.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ loc.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &loc, nil
+ }
+}
+
+func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ locs := make([]GeoLocation, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(newGeoLocationParser(q))
+ if err != nil {
+ return nil, err
+ }
+ switch vv := v.(type) {
+ case string:
+ locs = append(locs, GeoLocation{
+ Name: vv,
+ })
+ case *GeoLocation:
+ locs = append(locs, *vv)
+ default:
+ return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
+ }
+ }
+ return locs, nil
+ }
+}
+
//------------------------------------------------------------------------------
type GeoPos struct {
@@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string {
return cmdString(cmd, cmd.positions)
}
-func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser)
+ v, cmd.err = rd.ReadArrayReply(geoPosSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
return nil
}
+func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ positions := make([]*GeoPos, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(geoPosParser)
+ if err != nil {
+ if err == Nil {
+ positions = append(positions, nil)
+ continue
+ }
+ return nil, err
+ }
+ switch v := v.(type) {
+ case *GeoPos:
+ positions = append(positions, v)
+ default:
+ return nil, fmt.Errorf("got %T, expected *GeoPos", v)
+ }
+ }
+ return positions, nil
+}
+
+func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var pos GeoPos
+ var err error
+
+ pos.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ pos.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ return &pos, nil
+}
+
//------------------------------------------------------------------------------
type CommandInfo struct {
@@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
+func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser)
+ v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]*CommandInfo, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(commandInfoParser)
+ if err != nil {
+ return nil, err
+ }
+ vv := v.(*CommandInfo)
+ m[vv.Name] = vv
+
+ }
+ return m, nil
+}
+
+func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var cmd CommandInfo
+ var err error
+
+ if n != 6 {
+ return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
+ }
+
+ cmd.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ arity, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.Arity = int8(arity)
+
+ flags, err := rd.ReadReply(stringSliceParser)
+ if err != nil {
+ return nil, err
+ }
+ cmd.Flags = flags.([]string)
+
+ firstKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.FirstKeyPos = int8(firstKeyPos)
+
+ lastKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.LastKeyPos = int8(lastKeyPos)
+
+ stepCount, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.StepCount = int8(stepCount)
+
+ for _, flag := range cmd.Flags {
+ if flag == "readonly" {
+ cmd.ReadOnly = true
+ break
+ }
+ }
+
+ return &cmd, nil
+}
+
//------------------------------------------------------------------------------
type cmdsInfoCache struct {
diff --git a/src/dma/vendor/github.com/go-redis/redis/commands.go b/src/dma/vendor/github.com/go-redis/redis/commands.go
index dddf8acd..653e4abe 100644
--- a/src/dma/vendor/github.com/go-redis/redis/commands.go
+++ b/src/dma/vendor/github.com/go-redis/redis/commands.go
@@ -8,13 +8,6 @@ import (
"github.com/go-redis/redis/internal"
)
-func readTimeout(timeout time.Duration) time.Duration {
- if timeout == 0 {
- return 0
- }
- return timeout + 10*time.Second
-}
-
func usePrecise(dur time.Duration) bool {
return dur < time.Second || dur%time.Second != 0
}
@@ -172,16 +165,30 @@ type Cmdable interface {
SRem(key string, members ...interface{}) *IntCmd
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
- XAdd(stream, id string, els map[string]interface{}) *StringCmd
- XAddExt(opt *XAddExt) *StringCmd
- XLen(key string) *IntCmd
+ XAdd(a *XAddArgs) *StringCmd
+ XDel(stream string, ids ...string) *IntCmd
+ XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
XRevRange(stream string, start, stop string) *XMessageSliceCmd
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
- XRead(streams ...string) *XStreamSliceCmd
- XReadN(count int64, streams ...string) *XStreamSliceCmd
- XReadExt(opt *XReadExt) *XStreamSliceCmd
+ XRead(a *XReadArgs) *XStreamSliceCmd
+ XReadStreams(streams ...string) *XStreamSliceCmd
+ XGroupCreate(stream, group, start string) *StatusCmd
+ XGroupCreateMkStream(stream, group, start string) *StatusCmd
+ XGroupSetID(stream, group, start string) *StatusCmd
+ XGroupDestroy(stream, group string) *IntCmd
+ XGroupDelConsumer(stream, group, consumer string) *IntCmd
+ XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd
+ XAck(stream, group string, ids ...string) *IntCmd
+ XPending(stream, group string) *XPendingCmd
+ XPendingExt(a *XPendingExtArgs) *XPendingExtCmd
+ XClaim(a *XClaimArgs) *XMessageSliceCmd
+ XClaimJustID(a *XClaimArgs) *StringSliceCmd
+ XTrim(key string, maxLen int64) *IntCmd
+ XTrimApprox(key string, maxLen int64) *IntCmd
+ BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
+ BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@@ -196,6 +203,8 @@ type Cmdable interface {
ZLexCount(key, min, max string) *IntCmd
ZIncrBy(key string, increment float64, member string) *FloatCmd
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
+ ZPopMax(key string, count ...int64) *ZSliceCmd
+ ZPopMin(key string, count ...int64) *ZSliceCmd
ZRange(key string, start, stop int64) *StringSliceCmd
ZRangeWithScores(key string, start, stop int64) *ZSliceCmd
ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd
@@ -223,6 +232,7 @@ type Cmdable interface {
ClientKillByFilter(keys ...string) *IntCmd
ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd
+ ClientID() *IntCmd
ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd
@@ -260,6 +270,7 @@ type Cmdable interface {
ClusterResetHard() *StatusCmd
ClusterInfo() *StringCmd
ClusterKeySlot(key string) *IntCmd
+ ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
ClusterCountFailureReports(nodeID string) *IntCmd
ClusterCountKeysInSlot(slot int) *IntCmd
ClusterDelSlots(slots ...int) *StatusCmd
@@ -1300,7 +1311,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
//------------------------------------------------------------------------------
-type XAddExt struct {
+type XAddArgs struct {
Stream string
MaxLen int64 // MAXLEN N
MaxLenApprox int64 // MAXLEN ~ N
@@ -1308,40 +1319,42 @@ type XAddExt struct {
Values map[string]interface{}
}
-func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
- a := make([]interface{}, 0, 6+len(opt.Values)*2)
- a = append(a, "xadd")
- a = append(a, opt.Stream)
- if opt.MaxLen > 0 {
- a = append(a, "maxlen", opt.MaxLen)
- } else if opt.MaxLenApprox > 0 {
- a = append(a, "maxlen", "~", opt.MaxLenApprox)
+func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
+ args := make([]interface{}, 0, 6+len(a.Values)*2)
+ args = append(args, "xadd")
+ args = append(args, a.Stream)
+ if a.MaxLen > 0 {
+ args = append(args, "maxlen", a.MaxLen)
+ } else if a.MaxLenApprox > 0 {
+ args = append(args, "maxlen", "~", a.MaxLenApprox)
}
- if opt.ID != "" {
- a = append(a, opt.ID)
+ if a.ID != "" {
+ args = append(args, a.ID)
} else {
- a = append(a, "*")
+ args = append(args, "*")
}
- for k, v := range opt.Values {
- a = append(a, k)
- a = append(a, v)
+ for k, v := range a.Values {
+ args = append(args, k)
+ args = append(args, v)
}
- cmd := NewStringCmd(a...)
+ cmd := NewStringCmd(args...)
c.process(cmd)
return cmd
}
-func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
- return c.XAddExt(&XAddExt{
- Stream: stream,
- ID: id,
- Values: values,
- })
+func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
+ args := []interface{}{"xdel", stream}
+ for _, id := range ids {
+ args = append(args, id)
+ }
+ cmd := NewIntCmd(args...)
+ c.process(cmd)
+ return cmd
}
-func (c *cmdable) XLen(key string) *IntCmd {
- cmd := NewIntCmd("xlen", key)
+func (c *cmdable) XLen(stream string) *IntCmd {
+ cmd := NewIntCmd("xlen", stream)
c.process(cmd)
return cmd
}
@@ -1370,55 +1383,190 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS
return cmd
}
-type XReadExt struct {
+type XReadArgs struct {
Streams []string
Count int64
Block time.Duration
}
-func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
- a := make([]interface{}, 0, 5+len(opt.Streams))
- a = append(a, "xread")
- if opt != nil {
- if opt.Count > 0 {
- a = append(a, "count")
- a = append(a, opt.Count)
- }
- if opt.Block >= 0 {
- a = append(a, "block")
- a = append(a, int64(opt.Block/time.Millisecond))
- }
+func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
+ args := make([]interface{}, 0, 5+len(a.Streams))
+ args = append(args, "xread")
+ if a.Count > 0 {
+ args = append(args, "count")
+ args = append(args, a.Count)
+ }
+ if a.Block >= 0 {
+ args = append(args, "block")
+ args = append(args, int64(a.Block/time.Millisecond))
}
- a = append(a, "streams")
- for _, s := range opt.Streams {
- a = append(a, s)
+ args = append(args, "streams")
+ for _, s := range a.Streams {
+ args = append(args, s)
}
- cmd := NewXStreamSliceCmd(a...)
+ cmd := NewXStreamSliceCmd(args...)
+ if a.Block >= 0 {
+ cmd.setReadTimeout(a.Block)
+ }
c.process(cmd)
return cmd
}
-func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
+func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd {
+ return c.XRead(&XReadArgs{
Streams: streams,
Block: -1,
})
}
-func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
- Streams: streams,
- Count: count,
- Block: -1,
- })
+func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "create", stream, group, start)
+ c.process(cmd)
+ return cmd
}
-func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
- Streams: streams,
- Block: block,
- })
+func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd {
+ cmd := NewIntCmd("xgroup", "destroy", stream, group)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
+ cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer)
+ c.process(cmd)
+ return cmd
+}
+
+type XReadGroupArgs struct {
+ Group string
+ Consumer string
+ // List of streams and ids.
+ Streams []string
+ Count int64
+ Block time.Duration
+ NoAck bool
+}
+
+func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
+ args := make([]interface{}, 0, 8+len(a.Streams))
+ args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
+ if a.Count > 0 {
+ args = append(args, "count", a.Count)
+ }
+ if a.Block >= 0 {
+ args = append(args, "block", int64(a.Block/time.Millisecond))
+ }
+ if a.NoAck {
+ args = append(args, "noack")
+ }
+ args = append(args, "streams")
+ for _, s := range a.Streams {
+ args = append(args, s)
+ }
+
+ cmd := NewXStreamSliceCmd(args...)
+ if a.Block >= 0 {
+ cmd.setReadTimeout(a.Block)
+ }
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd {
+ args := []interface{}{"xack", stream, group}
+ for _, id := range ids {
+ args = append(args, id)
+ }
+ cmd := NewIntCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XPending(stream, group string) *XPendingCmd {
+ cmd := NewXPendingCmd("xpending", stream, group)
+ c.process(cmd)
+ return cmd
+}
+
+type XPendingExtArgs struct {
+ Stream string
+ Group string
+ Start string
+ End string
+ Count int64
+ Consumer string
+}
+
+func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd {
+ args := make([]interface{}, 0, 7)
+ args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
+ if a.Consumer != "" {
+ args = append(args, a.Consumer)
+ }
+ cmd := NewXPendingExtCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+type XClaimArgs struct {
+ Stream string
+ Group string
+ Consumer string
+ MinIdle time.Duration
+ Messages []string
+}
+
+func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd {
+ args := xClaimArgs(a)
+ cmd := NewXMessageSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd {
+ args := xClaimArgs(a)
+ args = append(args, "justid")
+ cmd := NewStringSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func xClaimArgs(a *XClaimArgs) []interface{} {
+ args := make([]interface{}, 0, 4+len(a.Messages))
+ args = append(args,
+ "xclaim",
+ a.Stream,
+ a.Group, a.Consumer,
+ int64(a.MinIdle/time.Millisecond))
+ for _, id := range a.Messages {
+ args = append(args, id)
+ }
+ return args
+}
+
+func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd {
+ cmd := NewIntCmd("xtrim", key, "maxlen", maxLen)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd {
+ cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen)
+ c.process(cmd)
+ return cmd
}
//------------------------------------------------------------------------------
@@ -1429,6 +1577,12 @@ type Z struct {
Member interface{}
}
+// ZWithKey represents sorted set member including the name of the key where it was popped.
+type ZWithKey struct {
+ Z
+ Key string
+}
+
// ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct {
Weights []float64
@@ -1436,6 +1590,34 @@ type ZStore struct {
Aggregate string
}
+// Redis `BZPOPMAX key [key ...] timeout` command.
+func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+ args := make([]interface{}, 1+len(keys)+1)
+ args[0] = "bzpopmax"
+ for i, key := range keys {
+ args[1+i] = key
+ }
+ args[len(args)-1] = formatSec(timeout)
+ cmd := NewZWithKeyCmd(args...)
+ cmd.setReadTimeout(timeout)
+ c.process(cmd)
+ return cmd
+}
+
+// Redis `BZPOPMIN key [key ...] timeout` command.
+func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+ args := make([]interface{}, 1+len(keys)+1)
+ args[0] = "bzpopmin"
+ for i, key := range keys {
+ args[1+i] = key
+ }
+ args[len(args)-1] = formatSec(timeout)
+ cmd := NewZWithKeyCmd(args...)
+ cmd.setReadTimeout(timeout)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members {
a[n+2*i] = m.Score
@@ -1574,6 +1756,46 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string)
return cmd
}
+func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd {
+ args := []interface{}{
+ "zpopmax",
+ key,
+ }
+
+ switch len(count) {
+ case 0:
+ break
+ case 1:
+ args = append(args, count[0])
+ default:
+ panic("too many arguments")
+ }
+
+ cmd := NewZSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd {
+ args := []interface{}{
+ "zpopmin",
+ key,
+ }
+
+ switch len(count) {
+ case 0:
+ break
+ case 1:
+ args = append(args, count[0])
+ default:
+ panic("too many arguments")
+ }
+
+ cmd := NewZSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd {
args := []interface{}{
"zrange",
@@ -1849,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd
}
+func (c *cmdable) ClientID() *IntCmd {
+ cmd := NewIntCmd("client", "id")
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ClientUnblock(id int64) *IntCmd {
+ cmd := NewIntCmd("client", "unblock", id)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
+ cmd := NewIntCmd("client", "unblock", id, "error")
+ c.process(cmd)
+ return cmd
+}
+
// ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name)
@@ -2164,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
return cmd
}
+func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
+ cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
c.process(cmd)
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
index e0ff8632..34f6bd4d 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/error.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
@@ -8,9 +8,18 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-func IsRetryableError(err error, retryNetError bool) bool {
- if IsNetworkError(err) {
- return retryNetError
+func IsRetryableError(err error, retryTimeout bool) bool {
+ if err == nil {
+ return false
+ }
+ if err == io.EOF {
+ return true
+ }
+ if netErr, ok := err.(net.Error); ok {
+ if netErr.Timeout() {
+ return retryTimeout
+ }
+ return true
}
s := err.Error()
if s == "ERR max number of clients reached" {
@@ -33,20 +42,13 @@ func IsRedisError(err error) bool {
return ok
}
-func IsNetworkError(err error) bool {
- if err == io.EOF {
- return true
- }
- _, ok := err.(net.Error)
- return ok
-}
-
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsRedisError(err) {
- return strings.HasPrefix(err.Error(), "READONLY ")
+ // #790
+ return IsReadOnlyError(err)
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@@ -81,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
+
+func IsReadOnlyError(err error) bool {
+ return strings.HasPrefix(err.Error(), "READONLY ")
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
index acaf3665..1095bfe5 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
@@ -13,19 +13,21 @@ var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
- Rd *proto.Reader
- Wb *proto.WriteBuffer
+ rd *proto.Reader
+ rdLocked bool
+ wr *proto.Writer
- Inited bool
- usedAt atomic.Value
+ InitedAt time.Time
+ pooled bool
+ usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
- Wb: proto.NewWriteBuffer(),
}
- cn.Rd = proto.NewReader(cn.netConn)
+ cn.rd = proto.NewReader(netConn)
+ cn.wr = proto.NewWriter(netConn)
cn.SetUsedAt(time.Now())
return cn
}
@@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
- cn.Rd.Reset(netConn)
+ cn.rd.Reset(netConn)
+ cn.wr.Reset(netConn)
}
-func (cn *Conn) IsStale(timeout time.Duration) bool {
- return timeout > 0 && time.Since(cn.UsedAt()) > timeout
-}
-
-func (cn *Conn) SetReadTimeout(timeout time.Duration) {
+func (cn *Conn) setReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetReadDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetReadDeadline(noDeadline)
+ return cn.netConn.SetReadDeadline(now.Add(timeout))
}
+ return cn.netConn.SetReadDeadline(noDeadline)
}
-func (cn *Conn) SetWriteTimeout(timeout time.Duration) {
+func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetWriteDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetWriteDeadline(noDeadline)
+ return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
+ return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Write(b []byte) (int, error) {
@@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
+func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
+ _ = cn.setReadTimeout(timeout)
+ return fn(cn.rd)
+}
+
+func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
+ _ = cn.setWriteTimeout(timeout)
+
+ firstErr := fn(cn.wr)
+ err := cn.wr.Flush()
+ if err != nil && firstErr == nil {
+ firstErr = err
+ }
+ return firstErr
+}
+
func (cn *Conn) Close() error {
return cn.netConn.Close()
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
index cab66904..9cecee8a 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,6 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // deprecated - use IdleConns
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -53,6 +52,8 @@ type Options struct {
OnClose func(*Conn) error
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -63,16 +64,16 @@ type ConnPool struct {
dialErrorsNum uint32 // atomic
- lastDialError error
lastDialErrorMu sync.RWMutex
+ lastDialError error
queue chan struct{}
- connsMu sync.Mutex
- conns []*Conn
-
- idleConnsMu sync.RWMutex
- idleConns []*Conn
+ connsMu sync.Mutex
+ conns []*Conn
+ idleConns []*Conn
+ poolSize int
+ idleConnsLen int
stats Stats
@@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool {
idleConns: make([]*Conn, 0, opt.PoolSize),
}
+ for i := 0; i < opt.MinIdleConns; i++ {
+ p.checkMinIdleConns()
+ }
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
@@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool {
return p
}
+func (p *ConnPool) checkMinIdleConns() {
+ if p.opt.MinIdleConns == 0 {
+ return
+ }
+ if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+ p.poolSize++
+ p.idleConnsLen++
+ go p.addIdleConn()
+ }
+}
+
+func (p *ConnPool) addIdleConn() {
+ cn, err := p.newConn(true)
+ if err != nil {
+ return
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.idleConns = append(p.idleConns, cn)
+ p.connsMu.Unlock()
+}
+
func (p *ConnPool) NewConn() (*Conn, error) {
- cn, err := p.newConn()
+ return p._NewConn(false)
+}
+
+func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
+ cn, err := p.newConn(pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
+ if pooled {
+ if p.poolSize < p.opt.PoolSize {
+ p.poolSize++
+ } else {
+ cn.pooled = false
+ }
+ }
p.connsMu.Unlock()
return cn, nil
}
-func (p *ConnPool) newConn() (*Conn, error) {
+func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) {
return nil, err
}
- return NewConn(netConn), nil
+ cn := NewConn(netConn)
+ cn.pooled = pooled
+ return cn, nil
}
func (p *ConnPool) tryDial() {
@@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) {
}
for {
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.popIdle()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn == nil {
break
}
- if cn.IsStale(p.opt.IdleTimeout) {
- p.CloseConn(cn)
+ if p.isStaleConn(cn) {
+ _ = p.CloseConn(cn)
continue
}
@@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) {
atomic.AddUint32(&p.stats.Misses, 1)
- newcn, err := p.NewConn()
+ newcn, err := p._NewConn(true)
if err != nil {
p.freeTurn()
return nil, err
@@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn {
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
-
+ p.idleConnsLen--
+ p.checkMinIdleConns()
return cn
}
func (p *ConnPool) Put(cn *Conn) {
- buf := cn.Rd.PeekBuffered()
- if buf != nil {
- internal.Logf("connection has unread data: %.100q", buf)
+ if !cn.pooled {
p.Remove(cn)
return
}
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
- p.idleConnsMu.Unlock()
+ p.idleConnsLen++
+ p.connsMu.Unlock()
p.freeTurn()
}
@@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ if cn.pooled {
+ p.poolSize--
+ p.checkMinIdleConns()
+ }
break
}
}
@@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error {
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
- l := len(p.conns)
+ n := len(p.conns)
p.connsMu.Unlock()
- return l
+ return n
}
-// FreeLen returns number of idle connections.
+// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
- p.idleConnsMu.RLock()
- l := len(p.idleConns)
- p.idleConnsMu.RUnlock()
- return l
+ p.connsMu.Lock()
+ n := p.idleConnsLen
+ p.connsMu.Unlock()
+ return n
}
func (p *ConnPool) Stats() *Stats {
@@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats {
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(idleLen),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
@@ -349,11 +393,10 @@ func (p *ConnPool) Close() error {
}
}
p.conns = nil
- p.connsMu.Unlock()
-
- p.idleConnsMu.Lock()
+ p.poolSize = 0
p.idleConns = nil
- p.idleConnsMu.Unlock()
+ p.idleConnsLen = 0
+ p.connsMu.Unlock()
return firstErr
}
@@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn {
}
cn := p.idleConns[0]
- if !cn.IsStale(p.opt.IdleTimeout) {
+ if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+ p.idleConnsLen--
return cn
}
@@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
for {
p.getTurn()
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.reapStaleConn()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn != nil {
p.removeConn(cn)
@@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+ if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+ return false
+ }
+
+ now := time.Now()
+ if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+ return true
+ }
+ if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
+ return true
+ }
+
+ return false
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
index 8c28c7b7..896b6f65 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -9,8 +9,6 @@ import (
"github.com/go-redis/redis/internal/util"
)
-const bytesAllocLimit = 1024 * 1024 // 1mb
-
const (
ErrorReply = '-'
StatusReply = '+'
@@ -32,40 +30,23 @@ func (e RedisError) Error() string { return string(e) }
type MultiBulkParse func(*Reader, int64) (interface{}, error)
type Reader struct {
- src *bufio.Reader
- buf []byte
+ rd *bufio.Reader
+ _buf []byte
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
- src: bufio.NewReader(rd),
- buf: make([]byte, 4096),
+ rd: bufio.NewReader(rd),
+ _buf: make([]byte, 64),
}
}
func (r *Reader) Reset(rd io.Reader) {
- r.src.Reset(rd)
-}
-
-func (r *Reader) PeekBuffered() []byte {
- if n := r.src.Buffered(); n != 0 {
- b, _ := r.src.Peek(n)
- return b
- }
- return nil
-}
-
-func (r *Reader) ReadN(n int) ([]byte, error) {
- b, err := readN(r.src, r.buf, n)
- if err != nil {
- return nil, err
- }
- r.buf = b
- return b, nil
+ r.rd.Reset(rd)
}
func (r *Reader) ReadLine() ([]byte, error) {
- line, isPrefix, err := r.src.ReadLine()
+ line, isPrefix, err := r.rd.ReadLine()
if err != nil {
return nil, err
}
@@ -91,11 +72,11 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
- return parseTmpStatusReply(line), nil
+ return string(line[1:]), nil
case IntReply:
return util.ParseInt(line[1:], 10, 64)
case StringReply:
- return r.readTmpBytesReply(line)
+ return r.readStringReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
@@ -121,47 +102,42 @@ func (r *Reader) ReadIntReply() (int64, error) {
}
}
-func (r *Reader) ReadTmpBytesReply() ([]byte, error) {
+func (r *Reader) ReadString() (string, error) {
line, err := r.ReadLine()
if err != nil {
- return nil, err
+ return "", err
}
switch line[0] {
case ErrorReply:
- return nil, ParseErrorReply(line)
+ return "", ParseErrorReply(line)
case StringReply:
- return r.readTmpBytesReply(line)
+ return r.readStringReply(line)
case StatusReply:
- return parseTmpStatusReply(line), nil
+ return string(line[1:]), nil
+ case IntReply:
+ return string(line[1:]), nil
default:
- return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
+ return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
}
}
-func (r *Reader) ReadBytesReply() ([]byte, error) {
- b, err := r.ReadTmpBytesReply()
- if err != nil {
- return nil, err
+func (r *Reader) readStringReply(line []byte) (string, error) {
+ if isNilReply(line) {
+ return "", Nil
}
- cp := make([]byte, len(b))
- copy(cp, b)
- return cp, nil
-}
-func (r *Reader) ReadStringReply() (string, error) {
- b, err := r.ReadTmpBytesReply()
+ replyLen, err := strconv.Atoi(string(line[1:]))
if err != nil {
return "", err
}
- return string(b), nil
-}
-func (r *Reader) ReadFloatReply() (float64, error) {
- b, err := r.ReadTmpBytesReply()
+ b := make([]byte, replyLen+2)
+ _, err = io.ReadFull(r.rd, b)
if err != nil {
- return 0, err
+ return "", err
}
- return util.ParseFloat(b, 64)
+
+ return util.BytesToString(b[:replyLen]), nil
}
func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
@@ -219,7 +195,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
keys := make([]string, n)
for i := int64(0); i < n; i++ {
- key, err := r.ReadStringReply()
+ key, err := r.ReadString()
if err != nil {
return nil, 0, err
}
@@ -229,69 +205,71 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
return keys, cursor, err
}
-func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) {
- if isNilReply(line) {
- return nil, Nil
- }
-
- replyLen, err := strconv.Atoi(string(line[1:]))
+func (r *Reader) ReadInt() (int64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
- return nil, err
+ return 0, err
}
+ return util.ParseInt(b, 10, 64)
+}
- b, err := r.ReadN(replyLen + 2)
+func (r *Reader) ReadUint() (uint64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
- return nil, err
+ return 0, err
}
- return b[:replyLen], nil
+ return util.ParseUint(b, 10, 64)
}
-func (r *Reader) ReadInt() (int64, error) {
- b, err := r.ReadTmpBytesReply()
+func (r *Reader) ReadFloatReply() (float64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
- return util.ParseInt(b, 10, 64)
+ return util.ParseFloat(b, 64)
}
-func (r *Reader) ReadUint() (uint64, error) {
- b, err := r.ReadTmpBytesReply()
+func (r *Reader) readTmpBytesReply() ([]byte, error) {
+ line, err := r.ReadLine()
if err != nil {
- return 0, err
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StringReply:
+ return r._readTmpBytesReply(line)
+ case StatusReply:
+ return line[1:], nil
+ default:
+ return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
}
- return util.ParseUint(b, 10, 64)
}
-// --------------------------------------------------------------------
+func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
+ if isNilReply(line) {
+ return nil, Nil
+ }
-func readN(r io.Reader, b []byte, n int) ([]byte, error) {
- if n == 0 && b == nil {
- return make([]byte, 0), nil
+ replyLen, err := strconv.Atoi(string(line[1:]))
+ if err != nil {
+ return nil, err
}
- if cap(b) >= n {
- b = b[:n]
- _, err := io.ReadFull(r, b)
- return b, err
+ buf := r.buf(replyLen + 2)
+ _, err = io.ReadFull(r.rd, buf)
+ if err != nil {
+ return nil, err
}
- b = b[:cap(b)]
- pos := 0
- for pos < n {
- diff := n - len(b)
- if diff > bytesAllocLimit {
- diff = bytesAllocLimit
- }
- b = append(b, make([]byte, diff)...)
+ return buf[:replyLen], nil
+}
- nn, err := io.ReadFull(r, b[pos:])
- if err != nil {
- return nil, err
- }
- pos += nn
+func (r *Reader) buf(n int) []byte {
+ if d := n - cap(r._buf); d > 0 {
+ r._buf = append(r._buf, make([]byte, d)...)
}
-
- return b, nil
+ return r._buf[:n]
}
func isNilReply(b []byte) bool {
@@ -304,10 +282,6 @@ func ParseErrorReply(line []byte) error {
return RedisError(string(line[1:]))
}
-func parseTmpStatusReply(line []byte) []byte {
- return line[1:]
-}
-
func parseArrayLen(line []byte) (int64, error) {
if isNilReply(line) {
return 0, Nil
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
deleted file mode 100644
index 664f4c33..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package proto
-
-import (
- "encoding"
- "fmt"
- "strconv"
-)
-
-type WriteBuffer struct {
- b []byte
-}
-
-func NewWriteBuffer() *WriteBuffer {
- return &WriteBuffer{
- b: make([]byte, 0, 4096),
- }
-}
-
-func (w *WriteBuffer) Len() int { return len(w.b) }
-func (w *WriteBuffer) Bytes() []byte { return w.b }
-func (w *WriteBuffer) Reset() { w.b = w.b[:0] }
-
-func (w *WriteBuffer) Append(args []interface{}) error {
- w.b = append(w.b, ArrayReply)
- w.b = strconv.AppendUint(w.b, uint64(len(args)), 10)
- w.b = append(w.b, '\r', '\n')
-
- for _, arg := range args {
- if err := w.append(arg); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (w *WriteBuffer) append(val interface{}) error {
- switch v := val.(type) {
- case nil:
- w.AppendString("")
- case string:
- w.AppendString(v)
- case []byte:
- w.AppendBytes(v)
- case int:
- w.AppendString(formatInt(int64(v)))
- case int8:
- w.AppendString(formatInt(int64(v)))
- case int16:
- w.AppendString(formatInt(int64(v)))
- case int32:
- w.AppendString(formatInt(int64(v)))
- case int64:
- w.AppendString(formatInt(v))
- case uint:
- w.AppendString(formatUint(uint64(v)))
- case uint8:
- w.AppendString(formatUint(uint64(v)))
- case uint16:
- w.AppendString(formatUint(uint64(v)))
- case uint32:
- w.AppendString(formatUint(uint64(v)))
- case uint64:
- w.AppendString(formatUint(v))
- case float32:
- w.AppendString(formatFloat(float64(v)))
- case float64:
- w.AppendString(formatFloat(v))
- case bool:
- if v {
- w.AppendString("1")
- } else {
- w.AppendString("0")
- }
- case encoding.BinaryMarshaler:
- b, err := v.MarshalBinary()
- if err != nil {
- return err
- }
- w.AppendBytes(b)
- default:
- return fmt.Errorf(
- "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val)
- }
- return nil
-}
-
-func (w *WriteBuffer) AppendString(s string) {
- w.b = append(w.b, StringReply)
- w.b = strconv.AppendUint(w.b, uint64(len(s)), 10)
- w.b = append(w.b, '\r', '\n')
- w.b = append(w.b, s...)
- w.b = append(w.b, '\r', '\n')
-}
-
-func (w *WriteBuffer) AppendBytes(p []byte) {
- w.b = append(w.b, StringReply)
- w.b = strconv.AppendUint(w.b, uint64(len(p)), 10)
- w.b = append(w.b, '\r', '\n')
- w.b = append(w.b, p...)
- w.b = append(w.b, '\r', '\n')
-}
-
-func formatInt(n int64) string {
- return strconv.FormatInt(n, 10)
-}
-
-func formatUint(u uint64) string {
- return strconv.FormatUint(u, 10)
-}
-
-func formatFloat(f float64) string {
- return strconv.FormatFloat(f, 'f', -1, 64)
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go
new file mode 100644
index 00000000..d106ce0e
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go
@@ -0,0 +1,159 @@
+package proto
+
+import (
+ "bufio"
+ "encoding"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/go-redis/redis/internal/util"
+)
+
+type Writer struct {
+ wr *bufio.Writer
+
+ lenBuf []byte
+ numBuf []byte
+}
+
+func NewWriter(wr io.Writer) *Writer {
+ return &Writer{
+ wr: bufio.NewWriter(wr),
+
+ lenBuf: make([]byte, 64),
+ numBuf: make([]byte, 64),
+ }
+}
+
+func (w *Writer) WriteArgs(args []interface{}) error {
+ err := w.wr.WriteByte(ArrayReply)
+ if err != nil {
+ return err
+ }
+
+ err = w.writeLen(len(args))
+ if err != nil {
+ return err
+ }
+
+ for _, arg := range args {
+ err := w.writeArg(arg)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (w *Writer) writeLen(n int) error {
+ w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10)
+ w.lenBuf = append(w.lenBuf, '\r', '\n')
+ _, err := w.wr.Write(w.lenBuf)
+ return err
+}
+
+func (w *Writer) writeArg(v interface{}) error {
+ switch v := v.(type) {
+ case nil:
+ return w.string("")
+ case string:
+ return w.string(v)
+ case []byte:
+ return w.bytes(v)
+ case int:
+ return w.int(int64(v))
+ case int8:
+ return w.int(int64(v))
+ case int16:
+ return w.int(int64(v))
+ case int32:
+ return w.int(int64(v))
+ case int64:
+ return w.int(v)
+ case uint:
+ return w.uint(uint64(v))
+ case uint8:
+ return w.uint(uint64(v))
+ case uint16:
+ return w.uint(uint64(v))
+ case uint32:
+ return w.uint(uint64(v))
+ case uint64:
+ return w.uint(v)
+ case float32:
+ return w.float(float64(v))
+ case float64:
+ return w.float(v)
+ case bool:
+ if v {
+ return w.int(1)
+ } else {
+ return w.int(0)
+ }
+ case encoding.BinaryMarshaler:
+ b, err := v.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ return w.bytes(b)
+ default:
+ return fmt.Errorf(
+ "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v)
+ }
+}
+
+func (w *Writer) bytes(b []byte) error {
+ err := w.wr.WriteByte(StringReply)
+ if err != nil {
+ return err
+ }
+
+ err = w.writeLen(len(b))
+ if err != nil {
+ return err
+ }
+
+ _, err = w.wr.Write(b)
+ if err != nil {
+ return err
+ }
+
+ return w.crlf()
+}
+
+func (w *Writer) string(s string) error {
+ return w.bytes(util.StringToBytes(s))
+}
+
+func (w *Writer) uint(n uint64) error {
+ w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) int(n int64) error {
+ w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) float(f float64) error {
+ w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) crlf() error {
+ err := w.wr.WriteByte('\r')
+ if err != nil {
+ return err
+ }
+ return w.wr.WriteByte('\n')
+}
+
+func (w *Writer) Reset(wr io.Writer) {
+ w.wr.Reset(wr)
+}
+
+func (w *Writer) Flush() error {
+ return w.wr.Flush()
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
deleted file mode 100644
index 3b174172..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Copyright 2013 Google Inc.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package singleflight provides a duplicate function call suppression
-// mechanism.
-package singleflight
-
-import "sync"
-
-// call is an in-flight or completed Do call
-type call struct {
- wg sync.WaitGroup
- val interface{}
- err error
-}
-
-// Group represents a class of work and forms a namespace in which
-// units of work can be executed with duplicate suppression.
-type Group struct {
- mu sync.Mutex // protects m
- m map[string]*call // lazily initialized
-}
-
-// Do executes and returns the results of the given function, making
-// sure that only one execution is in-flight for a given key at a
-// time. If a duplicate comes in, the duplicate caller waits for the
-// original to complete and receives the same results.
-func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
- g.mu.Lock()
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- if c, ok := g.m[key]; ok {
- g.mu.Unlock()
- c.wg.Wait()
- return c.val, c.err
- }
- c := new(call)
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
-
- c.val, c.err = fn()
- c.wg.Done()
-
- g.mu.Lock()
- delete(g.m, key)
- g.mu.Unlock()
-
- return c.val, c.err
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
index cd891833..1b3060eb 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
@@ -5,3 +5,7 @@ package util
func BytesToString(b []byte) string {
return string(b)
}
+
+func StringToBytes(s string) []byte {
+ return []byte(s)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
index 93a89c55..c9868aac 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
@@ -10,3 +10,13 @@ import (
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
+
+// StringToBytes converts string to byte slice.
+func StringToBytes(s string) []byte {
+ return *(*[]byte)(unsafe.Pointer(
+ &struct {
+ string
+ Cap int
+ }{s, len(s)},
+ ))
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/options.go b/src/dma/vendor/github.com/go-redis/redis/options.go
index 8a82d590..b6fabf3f 100644
--- a/src/dma/vendor/github.com/go-redis/redis/options.go
+++ b/src/dma/vendor/github.com/go-redis/redis/options.go
@@ -14,6 +14,17 @@ import (
"github.com/go-redis/redis/internal/pool"
)
+// Limiter is the interface of a rate limiter or a circuit breaker.
+type Limiter interface {
+ // Allow returns a nil if operation is allowed or an error otherwise.
+ // If operation is allowed client must report the result of operation
+ // whether is a success or a failure.
+ Allow() error
+ // ReportResult reports the result of previously allowed operation.
+ // nil indicates a success, non-nil error indicates a failure.
+ ReportResult(result error)
+}
+
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.
@@ -48,7 +59,7 @@ type Options struct {
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail
- // with a timeout instead of blocking.
+ // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail
@@ -59,6 +70,12 @@ type Options struct {
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
+ // Minimum number of idle connections which is useful when establishing
+ // new connection is slow.
+ MinIdleConns int
+ // Connection age at which client retires (closes) the connection.
+ // Default is to not close aged connections.
+ MaxConnAge time.Duration
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
@@ -69,7 +86,8 @@ type Options struct {
IdleTimeout time.Duration
// Frequency of idle checks made by idle connections reaper.
// Default is 1 minute. -1 disables idle connections reaper,
- // but idle connections are still discarded by the client.
+ // but idle connections are still discarded by the client
+ // if IdleTimeout is set.
IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes.
@@ -83,6 +101,9 @@ func (opt *Options) init() {
if opt.Network == "" {
opt.Network = "tcp"
}
+ if opt.Addr == "" {
+ opt.Addr = "localhost:6379"
+ }
if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) {
netDialer := &net.Dialer{
@@ -196,6 +217,8 @@ func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: opt.Dialer,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
diff --git a/src/dma/vendor/github.com/go-redis/redis/parser.go b/src/dma/vendor/github.com/go-redis/redis/parser.go
deleted file mode 100644
index f0dc67f0..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/parser.go
+++ /dev/null
@@ -1,394 +0,0 @@
-package redis
-
-import (
- "fmt"
- "net"
- "strconv"
- "time"
-
- "github.com/go-redis/redis/internal/proto"
-)
-
-// Implements proto.MultiBulkParse
-func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- vals := make([]interface{}, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(sliceParser)
- if err != nil {
- if err == Nil {
- vals = append(vals, nil)
- continue
- }
- if err, ok := err.(proto.RedisError); ok {
- vals = append(vals, err)
- continue
- }
- return nil, err
- }
-
- switch v := v.(type) {
- case []byte:
- vals = append(vals, string(v))
- default:
- vals = append(vals, v)
- }
- }
- return vals, nil
-}
-
-// Implements proto.MultiBulkParse
-func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- bools := make([]bool, 0, n)
- for i := int64(0); i < n; i++ {
- n, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- bools = append(bools, n == 1)
- }
- return bools, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- ss := make([]string, 0, n)
- for i := int64(0); i < n; i++ {
- s, err := rd.ReadStringReply()
- if err == Nil {
- ss = append(ss, "")
- } else if err != nil {
- return nil, err
- } else {
- ss = append(ss, s)
- }
- }
- return ss, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]string, n/2)
- for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- value, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = value
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]int64, n/2)
- for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- n, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = n
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]struct{}, n)
- for i := int64(0); i < n; i++ {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = struct{}{}
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- zz := make([]Z, n/2)
- for i := int64(0); i < n; i += 2 {
- var err error
-
- z := &zz[i/2]
-
- z.Member, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- z.Score, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
- return zz, nil
-}
-
-// Implements proto.MultiBulkParse
-func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
- slots := make([]ClusterSlot, n)
- for i := 0; i < len(slots); i++ {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n < 2 {
- err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
- return nil, err
- }
-
- start, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- end, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- nodes := make([]ClusterNode, n-2)
- for j := 0; j < len(nodes); j++ {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n != 2 && n != 3 {
- err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
- return nil, err
- }
-
- ip, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- port, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
-
- if n == 3 {
- id, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
- nodes[j].Id = id
- }
- }
-
- slots[i] = ClusterSlot{
- Start: int(start),
- End: int(end),
- Nodes: nodes,
- }
- }
- return slots, nil
-}
-
-func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
- return func(rd *proto.Reader, n int64) (interface{}, error) {
- var loc GeoLocation
- var err error
-
- loc.Name, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
- if q.WithDist {
- loc.Dist, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
- if q.WithGeoHash {
- loc.GeoHash, err = rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- }
- if q.WithCoord {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n != 2 {
- return nil, fmt.Errorf("got %d coordinates, expected 2", n)
- }
-
- loc.Longitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- loc.Latitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
-
- return &loc, nil
- }
-}
-
-func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
- return func(rd *proto.Reader, n int64) (interface{}, error) {
- locs := make([]GeoLocation, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(newGeoLocationParser(q))
- if err != nil {
- return nil, err
- }
- switch vv := v.(type) {
- case []byte:
- locs = append(locs, GeoLocation{
- Name: string(vv),
- })
- case *GeoLocation:
- locs = append(locs, *vv)
- default:
- return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
- }
- }
- return locs, nil
- }
-}
-
-func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
- var pos GeoPos
- var err error
-
- pos.Longitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
-
- pos.Latitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
-
- return &pos, nil
-}
-
-func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- positions := make([]*GeoPos, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(geoPosParser)
- if err != nil {
- if err == Nil {
- positions = append(positions, nil)
- continue
- }
- return nil, err
- }
- switch v := v.(type) {
- case *GeoPos:
- positions = append(positions, v)
- default:
- return nil, fmt.Errorf("got %T, expected *GeoPos", v)
- }
- }
- return positions, nil
-}
-
-func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
- var cmd CommandInfo
- var err error
-
- if n != 6 {
- return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
- }
-
- cmd.Name, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- arity, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.Arity = int8(arity)
-
- flags, err := rd.ReadReply(stringSliceParser)
- if err != nil {
- return nil, err
- }
- cmd.Flags = flags.([]string)
-
- firstKeyPos, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.FirstKeyPos = int8(firstKeyPos)
-
- lastKeyPos, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.LastKeyPos = int8(lastKeyPos)
-
- stepCount, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.StepCount = int8(stepCount)
-
- for _, flag := range cmd.Flags {
- if flag == "readonly" {
- cmd.ReadOnly = true
- break
- }
- }
-
- return &cmd, nil
-}
-
-// Implements proto.MultiBulkParse
-func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]*CommandInfo, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(commandInfoParser)
- if err != nil {
- return nil, err
- }
- vv := v.(*CommandInfo)
- m[vv.Name] = vv
-
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
- if n != 2 {
- return nil, fmt.Errorf("got %d elements, expected 2", n)
- }
-
- sec, err := rd.ReadInt()
- if err != nil {
- return nil, err
- }
-
- microsec, err := rd.ReadInt()
- if err != nil {
- return nil, err
- }
-
- return time.Unix(sec, microsec*1000), nil
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/pipeline.go b/src/dma/vendor/github.com/go-redis/redis/pipeline.go
index ba852283..b3a8844a 100644
--- a/src/dma/vendor/github.com/go-redis/redis/pipeline.go
+++ b/src/dma/vendor/github.com/go-redis/redis/pipeline.go
@@ -10,6 +10,7 @@ type pipelineExecer func([]Cmder) error
type Pipeliner interface {
StatefulCmdable
+ Do(args ...interface{}) *Cmd
Process(cmd Cmder) error
Close() error
Discard() error
@@ -31,6 +32,12 @@ type Pipeline struct {
closed bool
}
+func (c *Pipeline) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ _ = c.Process(cmd)
+ return cmd
+}
+
// Process queues the cmd for later execution.
func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock()
diff --git a/src/dma/vendor/github.com/go-redis/redis/pubsub.go b/src/dma/vendor/github.com/go-redis/redis/pubsub.go
index 2cfcd150..0afb47cd 100644
--- a/src/dma/vendor/github.com/go-redis/redis/pubsub.go
+++ b/src/dma/vendor/github.com/go-redis/redis/pubsub.go
@@ -1,15 +1,19 @@
package redis
import (
+ "errors"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool"
+ "github.com/go-redis/redis/internal/proto"
)
-// PubSub implements Pub/Sub commands as described in
+var errPingTimeout = errors.New("redis: ping timeout")
+
+// PubSub implements Pub/Sub commands bas described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
//
@@ -46,15 +50,17 @@ func (c *PubSub) conn() (*pool.Conn, error) {
return cn, err
}
-func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
+func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
-
if c.cn != nil {
return c.cn, nil
}
+ channels := mapKeys(c.channels)
+ channels = append(channels, newChannels...)
+
cn, err := c.newConn(channels)
if err != nil {
return nil, err
@@ -69,20 +75,24 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
return cn, nil
}
+func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
+ return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+}
+
func (c *PubSub) resubscribe(cn *pool.Conn) error {
var firstErr error
if len(c.channels) > 0 {
- channels := mapKeys(c.channels)
- err := c._subscribe(cn, "subscribe", channels...)
+ err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
if err != nil && firstErr == nil {
firstErr = err
}
}
if len(c.patterns) > 0 {
- patterns := mapKeys(c.patterns)
- err := c._subscribe(cn, "psubscribe", patterns...)
+ err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
if err != nil && firstErr == nil {
firstErr = err
}
@@ -101,51 +111,48 @@ func mapKeys(m map[string]struct{}) []string {
return s
}
-func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
- args := make([]interface{}, 1+len(channels))
- args[0] = redisCmd
- for i, channel := range channels {
- args[1+i] = channel
+func (c *PubSub) _subscribe(
+ cn *pool.Conn, redisCmd string, channels []string,
+) error {
+ args := make([]interface{}, 0, 1+len(channels))
+ args = append(args, redisCmd)
+ for _, channel := range channels {
+ args = append(args, channel)
}
cmd := NewSliceCmd(args...)
-
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- return writeCmd(cn, cmd)
+ return c.writeCmd(cn, cmd)
}
-func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
c.mu.Lock()
- c._releaseConn(cn, err)
+ c._releaseConn(cn, err, allowTimeout)
c.mu.Unlock()
}
-func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
if c.cn != cn {
return
}
- if internal.IsBadConn(err, true) {
- c._reconnect()
+ if internal.IsBadConn(err, allowTimeout) {
+ c._reconnect(err)
}
}
-func (c *PubSub) _closeTheCn() error {
- var err error
- if c.cn != nil {
- err = c.closeConn(c.cn)
- c.cn = nil
- }
- return err
-}
-
-func (c *PubSub) reconnect() {
- c.mu.Lock()
- c._reconnect()
- c.mu.Unlock()
+func (c *PubSub) _reconnect(reason error) {
+ _ = c._closeTheCn(reason)
+ _, _ = c._conn(nil)
}
-func (c *PubSub) _reconnect() {
- _ = c._closeTheCn()
- _, _ = c._conn(nil)
+func (c *PubSub) _closeTheCn(reason error) error {
+ if c.cn == nil {
+ return nil
+ }
+ if !c.closed {
+ internal.Logf("redis: discarding bad PubSub connection: %s", reason)
+ }
+ err := c.closeConn(c.cn)
+ c.cn = nil
+ return err
}
func (c *PubSub) Close() error {
@@ -158,7 +165,7 @@ func (c *PubSub) Close() error {
c.closed = true
close(c.exit)
- err := c._closeTheCn()
+ err := c._closeTheCn(pool.ErrClosed)
return err
}
@@ -172,8 +179,8 @@ func (c *PubSub) Subscribe(channels ...string) error {
if c.channels == nil {
c.channels = make(map[string]struct{})
}
- for _, channel := range channels {
- c.channels[channel] = struct{}{}
+ for _, s := range channels {
+ c.channels[s] = struct{}{}
}
return err
}
@@ -188,8 +195,8 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
if c.patterns == nil {
c.patterns = make(map[string]struct{})
}
- for _, pattern := range patterns {
- c.patterns[pattern] = struct{}{}
+ for _, s := range patterns {
+ c.patterns[s] = struct{}{}
}
return err
}
@@ -200,10 +207,10 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
- err := c.subscribe("unsubscribe", channels...)
for _, channel := range channels {
delete(c.channels, channel)
}
+ err := c.subscribe("unsubscribe", channels...)
return err
}
@@ -213,10 +220,10 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
- err := c.subscribe("punsubscribe", patterns...)
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
+ err := c.subscribe("punsubscribe", patterns...)
return err
}
@@ -226,8 +233,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
return err
}
- err = c._subscribe(cn, redisCmd, channels...)
- c._releaseConn(cn, err)
+ err = c._subscribe(cn, redisCmd, channels)
+ c._releaseConn(cn, err, false)
return err
}
@@ -243,9 +250,8 @@ func (c *PubSub) Ping(payload ...string) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- err = writeCmd(cn, cmd)
- c.releaseConn(cn, err)
+ err = c.writeCmd(cn, cmd)
+ c.releaseConn(cn, err, false)
return err
}
@@ -336,9 +342,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
return nil, err
}
- cn.SetReadTimeout(timeout)
- err = c.cmd.readReply(cn)
- c.releaseConn(cn, err)
+ err = cn.WithReader(timeout, func(rd *proto.Reader) error {
+ return c.cmd.readReply(rd)
+ })
+
+ c.releaseConn(cn, err, timeout > 0)
if err != nil {
return nil, err
}
@@ -432,21 +440,26 @@ func (c *PubSub) initChannel() {
timer := time.NewTimer(timeout)
timer.Stop()
- var hasPing bool
+ healthy := true
for {
timer.Reset(timeout)
select {
case <-c.ping:
- hasPing = true
+ healthy = true
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
- if hasPing {
- hasPing = false
- _ = c.Ping()
+ pingErr := c.Ping()
+ if healthy {
+ healthy = false
} else {
- c.reconnect()
+ if pingErr == nil {
+ pingErr = errPingTimeout
+ }
+ c.mu.Lock()
+ c._reconnect(pingErr)
+ c.mu.Unlock()
}
case <-c.exit:
return
diff --git a/src/dma/vendor/github.com/go-redis/redis/redis.go b/src/dma/vendor/github.com/go-redis/redis/redis.go
index c0f142cc..aca30648 100644
--- a/src/dma/vendor/github.com/go-redis/redis/redis.go
+++ b/src/dma/vendor/github.com/go-redis/redis/redis.go
@@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) {
type baseClient struct {
opt *Options
connPool pool.Pooler
+ limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
@@ -50,7 +51,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
@@ -61,12 +62,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
}
func (c *baseClient) getConn() (*pool.Conn, error) {
+ if c.limiter != nil {
+ err := c.limiter.Allow()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ cn, err := c._getConn()
+ if err != nil {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
+ }
+ return nil, err
+ }
+ return cn, nil
+}
+
+func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
err := c.initConn(cn)
if err != nil {
c.connPool.Remove(cn)
@@ -77,18 +96,32 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
return cn, nil
}
-func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
+func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
+ }
+
if internal.IsBadConn(err, false) {
c.connPool.Remove(cn)
- return false
+ } else {
+ c.connPool.Put(cn)
+ }
+}
+
+func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
}
- c.connPool.Put(cn)
- return true
+ if err == nil || internal.IsRedisError(err) {
+ c.connPool.Put(cn)
+ } else {
+ c.connPool.Remove(cn)
+ }
}
func (c *baseClient) initConn(cn *pool.Conn) error {
- cn.Inited = true
+ cn.InitedAt = time.Now()
if c.opt.Password == "" &&
c.opt.DB == 0 &&
@@ -123,8 +156,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
return nil
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *baseClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ _ = c.Process(cmd)
+ return cmd
+}
+
// WrapProcess wraps function that processes Redis commands.
-func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+func (c *baseClient) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
c.process = fn(c.process)
}
@@ -147,8 +189,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmd); err != nil {
+ err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+ if err != nil {
c.releaseConn(cn, err)
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
@@ -157,8 +201,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetReadTimeout(c.cmdTimeout(cmd))
- err = cmd.readReply(cn)
+ err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
+ return cmd.readReply(rd)
+ })
c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
@@ -176,7 +221,11 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
- return readTimeout(*timeout)
+ t := *timeout
+ if t == 0 {
+ return 0
+ }
+ return t + 10*time.Second
}
return c.opt.ReadTimeout
}
@@ -232,35 +281,33 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
}
canRetry, err := p(cn, cmds)
-
- if err == nil || internal.IsRedisError(err) {
- c.connPool.Put(cn)
- break
- }
- c.connPool.Remove(cn)
+ c.releaseConnStrict(cn, err)
if !canRetry || !internal.IsRetryableError(err, true) {
break
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmds...); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
- return true, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return pipelineReadCmds(rd, cmds)
+ })
+ return true, err
}
-func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
+func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
@@ -269,47 +316,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- return false, err
- }
-
- return false, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := txPipelineReadQueued(rd, cmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return false, err
}
-func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
multiExec := make([]Cmder, 0, len(cmds)+2)
multiExec = append(multiExec, NewStatusCmd("MULTI"))
multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, NewSliceCmd("EXEC"))
- return writeCmd(cn, multiExec...)
+ return writeCmd(wr, multiExec...)
}
-func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ err := statusCmd.readReply(rd)
+ if err != nil {
return err
}
- for _ = range cmds {
- err := statusCmd.readReply(cn)
+ for range cmds {
+ err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -373,12 +423,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
- c2 := c.copy()
+ c2 := c.clone()
c2.ctx = ctx
return c2
}
-func (c *Client) copy() *Client {
+func (c *Client) clone() *Client {
cp := *c
cp.init()
return &cp
@@ -389,6 +439,11 @@ func (c *Client) Options() *Options {
return c.opt
}
+func (c *Client) SetLimiter(l Limiter) *Client {
+ c.limiter = l
+ return c
+}
+
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
@@ -437,6 +492,30 @@ func (c *Client) pubSub() *PubSub {
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
+// Note that this method does not wait on a response from Redis, so the
+// subscription may not be active immediately. To force the connection to wait,
+// you may call the Receive() method on the returned *PubSub like so:
+//
+// sub := client.Subscribe(queryResp)
+// iface, err := sub.Receive()
+// if err != nil {
+// // handle error
+// }
+//
+// // Should be *Subscription, but others are possible if other actions have been
+// // taken on sub since it was created.
+// switch iface.(type) {
+// case *Subscription:
+// // subscribe succeeded
+// case *Message:
+// // received first message
+// case *Pong:
+// // pong received
+// default:
+// // handle error
+// }
+//
+// ch := sub.Channel()
func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
diff --git a/src/dma/vendor/github.com/go-redis/redis/result.go b/src/dma/vendor/github.com/go-redis/redis/result.go
index e086e8e3..e438f260 100644
--- a/src/dma/vendor/github.com/go-redis/redis/result.go
+++ b/src/dma/vendor/github.com/go-redis/redis/result.go
@@ -53,7 +53,7 @@ func NewBoolResult(val bool, err error) *BoolCmd {
// NewStringResult returns a StringCmd initialised with val and err for testing
func NewStringResult(val string, err error) *StringCmd {
var cmd StringCmd
- cmd.val = []byte(val)
+ cmd.val = val
cmd.setErr(err)
return &cmd
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go
index ef855115..250e5f64 100644
--- a/src/dma/vendor/github.com/go-redis/redis/ring.go
+++ b/src/dma/vendor/github.com/go-redis/redis/ring.go
@@ -68,6 +68,8 @@ type RingOptions struct {
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
@@ -315,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
-// Ring is a Redis client that uses constistent hashing to distribute
+// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
-// the ring. When shard comes online it is added back to the ring. This
+// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
@@ -338,6 +342,7 @@ type Ring struct {
shards *ringShards
cmdsInfoCache *cmdsInfoCache
+ process func(Cmder) error
processPipeline func([]Cmder) error
}
@@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+ ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
@@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats {
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
}
return &acc
}
@@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shards.GetByKey(firstKey)
}
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.ForEachShard(func(c *Client) error {
- c.WrapProcess(fn)
- return nil
- })
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
+func (c *Ring) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
+ c.process = fn(c.process)
}
func (c *Ring) Process(cmd Cmder) error {
- shard, err := c.cmdShard(cmd)
- if err != nil {
- cmd.setErr(err)
- return err
+ return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ shard, err := c.cmdShard(cmd)
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ err = shard.Client.Process(cmd)
+ if err == nil {
+ return nil
+ }
+ if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+ return err
+ }
}
- return shard.Client.Process(cmd)
+ return cmd.Err()
}
func (c *Ring) Pipeline() Pipeliner {
@@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
+ var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
+ var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
- shard, err := c.shards.GetByHash(hash)
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ wg.Add(1)
+ go func(hash string, cmds []Cmder) {
+ defer wg.Done()
+
+ shard, err := c.shards.GetByHash(hash)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- cn, err := shard.Client.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ cn, err := shard.Client.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- shard.Client.connPool.Put(cn)
- continue
- }
- shard.Client.connPool.Remove(cn)
+ canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
+ shard.Client.releaseConnStrict(cn, err)
- if canRetry && internal.IsRetryableError(err, true) {
- if failedCmdsMap == nil {
- failedCmdsMap = make(map[string][]Cmder)
+ if canRetry && internal.IsRetryableError(err, true) {
+ mu.Lock()
+ if failedCmdsMap == nil {
+ failedCmdsMap = make(map[string][]Cmder)
+ }
+ failedCmdsMap[hash] = cmds
+ mu.Unlock()
}
- failedCmdsMap[hash] = cmds
- }
+ }(hash, cmds)
}
+ wg.Wait()
if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *Ring) TxPipeline() Pipeliner {
diff --git a/src/dma/vendor/github.com/go-redis/redis/sentinel.go b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
index 12c29a71..7cbb90bd 100644
--- a/src/dma/vendor/github.com/go-redis/redis/sentinel.go
+++ b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
@@ -29,13 +29,17 @@ type FailoverOptions struct {
Password string
DB int
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
}
c.baseClient.init()
- c.setProcessor(c.Process)
+ c.cmdable.setProcessor(c.Process)
return &c
}
@@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c
}
-func (c *SentinelClient) PubSub() *PubSub {
+func (c *SentinelClient) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
@@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub {
return pubsub
}
+// Subscribe subscribes the client to the specified channels.
+// Channels can be omitted to create empty subscription.
+func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.Subscribe(channels...)
+ }
+ return pubsub
+}
+
+// PSubscribe subscribes the client to the given patterns.
+// Patterns can be omitted to create empty subscription.
+func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.PSubscribe(channels...)
+ }
+ return pubsub
+}
+
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
- cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
+ cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
func (c *SentinelClient) Sentinels(name string) *SliceCmd {
- cmd := NewSliceCmd("SENTINEL", "sentinels", name)
+ cmd := NewSliceCmd("sentinel", "sentinels", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Failover forces a failover as if the master was not reachable, and without
+// asking for agreement to other Sentinels.
+func (c *SentinelClient) Failover(name string) *StatusCmd {
+ cmd := NewStatusCmd("sentinel", "failover", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Reset resets all the masters with matching name. The pattern argument is a
+// glob-style pattern. The reset process clears any previous state in a master
+// (including a failover in progress), and removes every slave and sentinel
+// already discovered and associated with the master.
+func (c *SentinelClient) Reset(pattern string) *IntCmd {
+ cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
@@ -152,79 +194,81 @@ type sentinelFailover struct {
masterName string
_masterAddr string
sentinel *SentinelClient
+ pubsub *PubSub
}
-func (d *sentinelFailover) Close() error {
- return d.resetSentinel()
+func (c *sentinelFailover) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.sentinel != nil {
+ return c.closeSentinel()
+ }
+ return nil
}
-func (d *sentinelFailover) Pool() *pool.ConnPool {
- d.poolOnce.Do(func() {
- d.opt.Dialer = d.dial
- d.pool = newConnPool(d.opt)
+func (c *sentinelFailover) Pool() *pool.ConnPool {
+ c.poolOnce.Do(func() {
+ c.opt.Dialer = c.dial
+ c.pool = newConnPool(c.opt)
})
- return d.pool
+ return c.pool
}
-func (d *sentinelFailover) dial() (net.Conn, error) {
- addr, err := d.MasterAddr()
+func (c *sentinelFailover) dial() (net.Conn, error) {
+ addr, err := c.MasterAddr()
if err != nil {
return nil, err
}
- return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
+ return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
}
-func (d *sentinelFailover) MasterAddr() (string, error) {
- d.mu.Lock()
- defer d.mu.Unlock()
-
- addr, err := d.masterAddr()
+func (c *sentinelFailover) MasterAddr() (string, error) {
+ addr, err := c.masterAddr()
if err != nil {
return "", err
}
- d._switchMaster(addr)
-
+ c.switchMaster(addr)
return addr, nil
}
-func (d *sentinelFailover) masterAddr() (string, error) {
- // Try last working sentinel.
- if d.sentinel != nil {
- addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
- if err == nil {
- addr := net.JoinHostPort(addr[0], addr[1])
- return addr, nil
- }
-
- internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
- d.masterName, err)
- d._resetSentinel()
+func (c *sentinelFailover) masterAddr() (string, error) {
+ addr := c.getMasterAddr()
+ if addr != "" {
+ return addr, nil
}
- for i, sentinelAddr := range d.sentinelAddrs {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
- DialTimeout: d.opt.DialTimeout,
- ReadTimeout: d.opt.ReadTimeout,
- WriteTimeout: d.opt.WriteTimeout,
+ MaxRetries: c.opt.MaxRetries,
+
+ DialTimeout: c.opt.DialTimeout,
+ ReadTimeout: c.opt.ReadTimeout,
+ WriteTimeout: c.opt.WriteTimeout,
- PoolSize: d.opt.PoolSize,
- PoolTimeout: d.opt.PoolTimeout,
- IdleTimeout: d.opt.IdleTimeout,
+ PoolSize: c.opt.PoolSize,
+ PoolTimeout: c.opt.PoolTimeout,
+ IdleTimeout: c.opt.IdleTimeout,
+ IdleCheckFrequency: c.opt.IdleCheckFrequency,
+
+ TLSConfig: c.opt.TLSConfig,
})
- masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
+ masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
- d.masterName, err)
- sentinel.Close()
+ c.masterName, err)
+ _ = sentinel.Close()
continue
}
// Push working sentinel to the top.
- d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
- d.setSentinel(sentinel)
+ c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
+ c.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
@@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}
-func (c *sentinelFailover) switchMaster(addr string) {
- c.mu.Lock()
- c._switchMaster(addr)
- c.mu.Unlock()
+func (c *sentinelFailover) getMasterAddr() string {
+ c.mu.RLock()
+ sentinel := c.sentinel
+ c.mu.RUnlock()
+
+ if sentinel == nil {
+ return ""
+ }
+
+ addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
+ if err != nil {
+ internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
+ c.masterName, err)
+ c.mu.Lock()
+ if c.sentinel == sentinel {
+ c.closeSentinel()
+ }
+ c.mu.Unlock()
+ return ""
+ }
+
+ return net.JoinHostPort(addr[0], addr[1])
}
-func (c *sentinelFailover) _switchMaster(addr string) {
- if c._masterAddr == addr {
+func (c *sentinelFailover) switchMaster(addr string) {
+ c.mu.RLock()
+ masterAddr := c._masterAddr
+ c.mu.RUnlock()
+ if masterAddr == addr {
return
}
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
internal.Logf("sentinel: new master=%q addr=%q",
c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
@@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) {
c._masterAddr = addr
}
-func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
- d.discoverSentinels(sentinel)
- d.sentinel = sentinel
- go d.listen(sentinel)
+func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
+ c.discoverSentinels(sentinel)
+ c.sentinel = sentinel
+
+ c.pubsub = sentinel.Subscribe("+switch-master")
+ go c.listen(c.pubsub)
}
-func (d *sentinelFailover) resetSentinel() error {
- var err error
- d.mu.Lock()
- if d.sentinel != nil {
- err = d._resetSentinel()
+func (c *sentinelFailover) closeSentinel() error {
+ var firstErr error
+
+ err := c.pubsub.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
}
- d.mu.Unlock()
- return err
-}
+ c.pubsub = nil
-func (d *sentinelFailover) _resetSentinel() error {
- err := d.sentinel.Close()
- d.sentinel = nil
- return err
+ err = c.sentinel.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
+ }
+ c.sentinel = nil
+
+ return firstErr
}
-func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
- sentinels, err := sentinel.Sentinels(d.masterName).Result()
+func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
+ sentinels, err := sentinel.Sentinels(c.masterName).Result()
if err != nil {
- internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
+ internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
return
}
for _, sentinel := range sentinels {
@@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
key := vals[i].(string)
if key == "name" {
sentinelAddr := vals[i+1].(string)
- if !contains(d.sentinelAddrs, sentinelAddr) {
- internal.Logf(
- "sentinel: discovered new sentinel=%q for master=%q",
- sentinelAddr, d.masterName,
- )
- d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
+ if !contains(c.sentinelAddrs, sentinelAddr) {
+ internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
+ sentinelAddr, c.masterName)
+ c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
}
}
}
}
}
-func (d *sentinelFailover) listen(sentinel *SentinelClient) {
- pubsub := sentinel.PubSub()
- defer pubsub.Close()
-
- err := pubsub.Subscribe("+switch-master")
- if err != nil {
- internal.Logf("sentinel: Subscribe failed: %s", err)
- d.resetSentinel()
- return
- }
-
+func (c *sentinelFailover) listen(pubsub *PubSub) {
+ ch := pubsub.Channel()
for {
- msg, err := pubsub.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- d.resetSentinel()
- return
- }
- internal.Logf("sentinel: ReceiveMessage failed: %s", err)
- continue
+ msg, ok := <-ch
+ if !ok {
+ break
}
switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
- if parts[0] != d.masterName {
+ if parts[0] != c.masterName {
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
- d.switchMaster(addr)
+ c.switchMaster(addr)
}
}
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/tx.go b/src/dma/vendor/github.com/go-redis/redis/tx.go
index 6a7da99d..fb3e6331 100644
--- a/src/dma/vendor/github.com/go-redis/redis/tx.go
+++ b/src/dma/vendor/github.com/go-redis/redis/tx.go
@@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx {
return &tx
}
-// Watch prepares a transcaction and marks the keys to be watched
+// Watch prepares a transaction and marks the keys to be watched
// for conditional execution if there are any keys.
//
-// The transaction is automatically closed when the fn exits.
+// The transaction is automatically closed when fn exits.
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
if len(keys) > 0 {
diff --git a/src/dma/vendor/github.com/go-redis/redis/universal.go b/src/dma/vendor/github.com/go-redis/redis/universal.go
index 9e30c81d..a6075624 100644
--- a/src/dma/vendor/github.com/go-redis/redis/universal.go
+++ b/src/dma/vendor/github.com/go-redis/redis/universal.go
@@ -12,35 +12,38 @@ type UniversalOptions struct {
// of cluster/sentinel nodes.
Addrs []string
- // The sentinel master name.
- // Only failover clients.
- MasterName string
-
// Database to be selected after connecting to the server.
// Only single-node and failover clients.
DB int
- // Only cluster clients.
-
- // Enables read only queries on slave nodes.
- ReadOnly bool
-
- MaxRedirects int
- RouteByLatency bool
-
- // Common options
+ // Common options.
OnConnect func(*Conn) error
- MaxRetries int
Password string
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
+
+ // Only cluster clients.
+
+ MaxRedirects int
+ ReadOnly bool
+ RouteByLatency bool
+ RouteRandomly bool
+
+ // The sentinel master name.
+ // Only failover clients.
+ MasterName string
}
func (o *UniversalOptions) cluster() *ClusterOptions {
@@ -49,22 +52,31 @@ func (o *UniversalOptions) cluster() *ClusterOptions {
}
return &ClusterOptions{
- Addrs: o.Addrs,
+ Addrs: o.Addrs,
+ OnConnect: o.OnConnect,
+
+ Password: o.Password,
+
MaxRedirects: o.MaxRedirects,
- RouteByLatency: o.RouteByLatency,
ReadOnly: o.ReadOnly,
+ RouteByLatency: o.RouteByLatency,
+ RouteRandomly: o.RouteRandomly,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}
@@ -76,19 +88,27 @@ func (o *UniversalOptions) failover() *FailoverOptions {
return &FailoverOptions{
SentinelAddrs: o.Addrs,
MasterName: o.MasterName,
- DB: o.DB,
+ OnConnect: o.OnConnect,
+
+ DB: o.DB,
+ Password: o.Password,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
+
+ DialTimeout: o.DialTimeout,
+ ReadTimeout: o.ReadTimeout,
+ WriteTimeout: o.WriteTimeout,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
- DialTimeout: o.DialTimeout,
- ReadTimeout: o.ReadTimeout,
- WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}
@@ -99,20 +119,28 @@ func (o *UniversalOptions) simple() *Options {
}
return &Options{
- Addr: addr,
- DB: o.DB,
+ Addr: addr,
+ OnConnect: o.OnConnect,
+
+ DB: o.DB,
+ Password: o.Password,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
+
+ DialTimeout: o.DialTimeout,
+ ReadTimeout: o.ReadTimeout,
+ WriteTimeout: o.WriteTimeout,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
- DialTimeout: o.DialTimeout,
- ReadTimeout: o.ReadTimeout,
- WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}