diff options
Diffstat (limited to 'src/dma/vendor/github.com/go-redis')
25 files changed, 2338 insertions, 1473 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/.travis.yml b/src/dma/vendor/github.com/go-redis/redis/.travis.yml index 39ffc2be..6b110b4c 100644 --- a/src/dma/vendor/github.com/go-redis/redis/.travis.yml +++ b/src/dma/vendor/github.com/go-redis/redis/.travis.yml @@ -5,10 +5,9 @@ services: - redis-server go: - - 1.7.x - - 1.8.x - 1.9.x - 1.10.x + - 1.11.x - tip matrix: diff --git a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md index cb0e1b8e..19645661 100644 --- a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md +++ b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## Unreleased + +- Cluster and Ring pipelines process commands for each node in its own goroutine. + +## 6.14 + +- Added Options.MinIdleConns. +- Added Options.MaxConnAge. +- PoolStats.FreeConns is renamed to PoolStats.IdleConns. +- Add Client.Do to simplify creating custom commands. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. +- Lower memory usage. + ## v6.13 - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. diff --git a/src/dma/vendor/github.com/go-redis/redis/Makefile b/src/dma/vendor/github.com/go-redis/redis/Makefile index 1fbdac91..fa3b4e00 100644 --- a/src/dma/vendor/github.com/go-redis/redis/Makefile +++ b/src/dma/vendor/github.com/go-redis/redis/Makefile @@ -3,6 +3,8 @@ all: testdeps go test ./... -short -race env GOOS=linux GOARCH=386 go test ./... go vet + go get github.com/gordonklaus/ineffassign + ineffassign . testdeps: testdata/redis/src/redis-server @@ -13,7 +15,7 @@ bench: testdeps testdata/redis: mkdir -p $@ - wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile diff --git a/src/dma/vendor/github.com/go-redis/redis/cluster.go b/src/dma/vendor/github.com/go-redis/redis/cluster.go index 7a1af143..0cecc62c 100644 --- a/src/dma/vendor/github.com/go-redis/redis/cluster.go +++ b/src/dma/vendor/github.com/go-redis/redis/cluster.go @@ -3,11 +3,11 @@ package redis import ( "context" "crypto/tls" - "errors" "fmt" "math" "math/rand" "net" + "runtime" "sort" "sync" "sync/atomic" @@ -17,7 +17,6 @@ import ( "github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/singleflight" ) var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") @@ -49,14 +48,18 @@ type ClusterOptions struct { // and Cluster.ReloadState to manually trigger state reloading. ClusterSlots func() ([]ClusterSlot, error) + // Optional hook that is called when a new node is created. + OnNewNode func(*Client) + // Following options are copied from Options struct. OnConnect func(*Conn) error + Password string + MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - Password string DialTimeout time.Duration ReadTimeout time.Duration @@ -64,6 +67,8 @@ type ClusterOptions struct { // PoolSize applies per cluster node and not for the whole cluster. PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() { opt.MaxRedirects = 8 } - if opt.RouteByLatency || opt.RouteRandomly { + if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { opt.ReadOnly = true } + if opt.PoolSize == 0 { + opt.PoolSize = 5 * runtime.NumCPU() + } + switch opt.ReadTimeout { case -1: opt.ReadTimeout = 0 @@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, TLSConfig: opt.TLSConfig, @@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { go node.updateLatency() } + if clOpt.OnNewNode != nil { + clOpt.OnNewNode(node.Client) + } + return &node } @@ -228,8 +242,6 @@ type clusterNodes struct { clusterAddrs []string closed bool - nodeCreateGroup singleflight.Group - _generation uint32 // atomic } @@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { return node, nil } - v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { - node := newClusterNode(c.opt, addr) - return node, nil - }) - c.mu.Lock() defer c.mu.Unlock() @@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { node, ok := c.allNodes[addr] if ok { - _ = v.(*clusterNode).Close() return node, err } - node = v.(*clusterNode) + + node = newClusterNode(c.opt, addr) c.allAddrs = appendIfNotExists(c.allAddrs, addr) - if err == nil { - c.clusterAddrs = append(c.clusterAddrs, addr) - } + c.clusterAddrs = append(c.clusterAddrs, addr) c.allNodes[addr] = node return node, err @@ -429,13 +434,15 @@ func newClusterState( createdAt: time.Now(), } - isLoopbackOrigin := isLoopbackAddr(origin) + originHost, _, _ := net.SplitHostPort(origin) + isLoopbackOrigin := isLoopback(originHost) + for _, slot := range slots { var nodes []*clusterNode for i, slotNode := range slot.Nodes { addr := slotNode.Addr - if !isLoopbackOrigin && useOriginAddr(origin, addr) { - addr = origin + if !isLoopbackOrigin { + addr = replaceLoopbackHost(addr, originHost) } node, err := c.nodes.GetOrCreate(addr) @@ -469,6 +476,33 @@ func newClusterState( return &c, nil } +func replaceLoopbackHost(nodeAddr, originHost string) string { + nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) + if err != nil { + return nodeAddr + } + + nodeIP := net.ParseIP(nodeHost) + if nodeIP == nil { + return nodeAddr + } + + if !nodeIP.IsLoopback() { + return nodeAddr + } + + // Use origin host which is not loopback and node port. + return net.JoinHostPort(originHost, nodePort) +} + +func isLoopback(host string) bool { + ip := net.ParseIP(host) + if ip == nil { + return true + } + return ip.IsLoopback() +} + func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) if len(nodes) > 0 { @@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { n := rand.Intn(len(nodes)-1) + 1 slave = nodes[n] if !slave.Loading() { - break + return slave, nil } } - return slave, nil + + // All slaves are loading - use master. + return nodes[0], nil } } @@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { return nil } -func (c *clusterState) IsConsistent() bool { - if c.nodes.opt.ClusterSlots != nil { - return true - } - return len(c.Masters) <= len(c.Slaves) -} - //------------------------------------------------------------------------------ type clusterStateHolder struct { load func() (*clusterState, error) - state atomic.Value - - firstErrMu sync.RWMutex - firstErr error - + state atomic.Value reloading uint32 // atomic } @@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder } func (c *clusterStateHolder) Reload() (*clusterState, error) { - state, err := c.reload() - if err != nil { - return nil, err - } - if !state.IsConsistent() { - time.AfterFunc(time.Second, c.LazyReload) - } - return state, nil -} - -func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.firstErrMu.Lock() - if c.firstErr == nil { - c.firstErr = err - } - c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - for { - state, err := c.reload() - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - if state.IsConsistent() { - return - } + _, err := c.Reload() + if err != nil { + return } + time.Sleep(100 * time.Millisecond) }() } @@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { } return state, nil } - - c.firstErrMu.RLock() - err := c.firstErr - c.firstErrMu.RUnlock() - if err != nil { - return nil, err - } - - return nil, errors.New("redis: cluster has no state") + return c.Reload() } func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { @@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.processTxPipeline = c.defaultProcessTxPipeline c.init() - - _, _ = c.state.Reload() - _, _ = c.cmdsInfoCache.Get() - if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } -// ReloadState reloads cluster state. It calls ClusterSlots func +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + +// ReloadState reloads cluster state. If available it calls ClusterSlots func // to get cluster slots information. func (c *ClusterClient) ReloadState() error { _, err := c.state.Reload() return err } -func (c *ClusterClient) init() { - c.cmdable.setProcessor(c.Process) -} - func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx @@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int { } func (c *ClusterClient) cmdSlot(cmd Cmder) int { + args := cmd.Args() + if args[0] == "cluster" && args[1] == "getkeysinslot" { + return args[2].(int) + } + cmdInfo := c.cmdInfo(cmd.Name()) return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } @@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { } cmdInfo := c.cmdInfo(cmd.Name()) - slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) + slot := c.cmdSlot(cmd) - if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { + if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if c.opt.RouteByLatency { node, err := state.slotClosestNode(slot) return slot, node, err @@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if err == nil { break } - - if internal.IsRetryableError(err, true) { + if err != Nil { c.state.LazyReload() - continue } moved, ask, addr := internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() node, err = c.nodes.GetOrCreate(addr) if err != nil { return err @@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node, err = c.slotMasterNode(slot) if err != nil { return err @@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } + if internal.IsRetryableError(err, true) { + continue + } + return err } @@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + func (c *ClusterClient) WrapProcess( fn func(oldProcess func(Cmder) error) func(Cmder) error, ) { @@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { if err == nil { break } + if err != Nil { + c.state.LazyReload() + } - // If slave is loading - read from master. + // If slave is loading - pick another node. if c.opt.ReadOnly && internal.IsLoadingError(err) { node.MarkAsLoading() - continue - } - - if internal.IsRetryableError(err, true) { - c.state.LazyReload() - - // First retry the same node. - if attempt == 0 { - continue - } - - // Second try random node. - node, err = c.nodes.Random() - if err != nil { - break - } + node = nil continue } @@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() - node, err = c.nodes.GetOrCreate(addr) if err != nil { break @@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node = nil continue } + if internal.IsRetryableError(err, true) { + // First retry the same node. + if attempt == 0 { + continue + } + + // Second try random node. + node, err = c.nodes.Random() + if err != nil { + break + } + continue + } + break } @@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline( } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsByNode(cmds) + cmdsMap := newCmdsMap() + err := c.mapCmdsByNode(cmds, cmdsMap) if err != nil { setCmdsErr(cmds, err) return err @@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup - for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + for node, cmds := range cmdsMap.m { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } cmdsMap = failedCmds } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { state, err := c.state.Get() if err != nil { setCmdsErr(cmds, err) - return nil, err + return err } - cmdsMap := make(map[*clusterNode][]Cmder) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { var node *clusterNode @@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e node, err = state.slotMasterNode(slot) } if err != nil { - return nil, err + return err } - cmdsMap[node] = append(cmdsMap[node], cmd) + cmdsMap.mu.Lock() + cmdsMap.m[node] = append(cmdsMap.m[node], cmd) + cmdsMap.mu.Unlock() } - return cmdsMap, nil + return nil } func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { @@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { return true } -func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { - remappedCmds, err := c.mapCmdsByNode(cmds) - if err != nil { - setCmdsErr(cmds, err) - return - } - - for node, cmds := range remappedCmds { - failedCmds[node] = cmds - } -} - func (c *ClusterClient) pipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - - err := writeCmd(cn, cmds...) + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) if err != nil { setCmdsErr(cmds, err) - failedCmds[node] = cmds + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - return c.pipelineReadCmds(cn, cmds, failedCmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return c.pipelineReadCmds(node, rd, cmds, failedCmds) + }) + return err } func (c *ClusterClient) pipelineReadCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { + var firstErr error for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err == nil { continue } @@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds( continue } - return err + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + if firstErr == nil { + firstErr = err + } } - return nil + return firstErr } func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, + cmd Cmder, err error, failedCmds *cmdsMap, ) bool { moved, ask, addr := internal.IsMovedError(err) @@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() return true } @@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Unlock() return true } @@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } - cmdsMap = failedCmds + cmdsMap = failedCmds.m } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { @@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } func (c *ClusterClient) txPipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { - setCmdsErr(cmds, err) - failedCmds[node] = cmds - return err - } - - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - return pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := c.txPipelineReadQueued(rd, cmds, failedCmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return err } func (c *ClusterClient) txPipelineReadQueued( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { - err := statusCmd.readReply(cn) + err := statusCmd.readReply(rd) if err == nil { continue } @@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued( } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued( return nil } -func (c *ClusterClient) pubSub(channels []string) *PubSub { +func (c *ClusterClient) pubSub() *PubSub { var node *clusterNode pubsub := &PubSub{ opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { - if node == nil { - var slot int - if len(channels) > 0 { - slot = hashtag.Slot(channels[0]) - } else { - slot = -1 - } + if node != nil { + panic("node != nil") + } - masterNode, err := c.slotMasterNode(slot) - if err != nil { - return nil, err - } - node = masterNode + slot := hashtag.Slot(channels[0]) + + var err error + node, err = c.slotMasterNode(slot) + if err != nil { + return nil, err } - return node.Client.newConn() + + cn, err := node.Client.newConn() + if err != nil { + return nil, err + } + + return cn, nil }, closeConn: func(cn *pool.Conn) error { - return node.Client.connPool.CloseConn(cn) + err := node.Client.connPool.CloseConn(cn) + node = nil + return err }, } pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.Subscribe(channels...) } @@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub { // PSubscribe subscribes the client to the given patterns. // Patterns can be omitted to create empty subscription. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.PSubscribe(channels...) } return pubsub } -func useOriginAddr(originAddr, nodeAddr string) bool { - nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) - if err != nil { - return false - } - - nodeIP := net.ParseIP(nodeHost) - if nodeIP == nil { - return false - } - - if !nodeIP.IsLoopback() { - return false - } - - _, originPort, err := net.SplitHostPort(originAddr) - if err != nil { - return false - } - - return nodePort == originPort -} - -func isLoopbackAddr(addr string) bool { - host, _, err := net.SplitHostPort(addr) - if err != nil { - return false - } - - ip := net.ParseIP(host) - if ip == nil { - return false - } - - return ip.IsLoopback() -} - func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go index 11472bec..cb4f94b1 100644 --- a/src/dma/vendor/github.com/go-redis/redis/command.go +++ b/src/dma/vendor/github.com/go-redis/redis/command.go @@ -1,16 +1,14 @@ package redis import ( - "bytes" "fmt" + "net" "strconv" "strings" "time" "github.com/go-redis/redis/internal" - "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/util" ) type Cmder interface { @@ -18,13 +16,12 @@ type Cmder interface { Args() []interface{} stringArg(int) string - readReply(*pool.Conn) error + readReply(rd *proto.Reader) error setErr(error) readTimeout() *time.Duration Err() error - fmt.Stringer } func setCmdsErr(cmds []Cmder, e error) { @@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) { } } -func firstCmdsErr(cmds []Cmder) error { +func cmdsFirstErr(cmds []Cmder) error { for _, cmd := range cmds { if err := cmd.Err(); err != nil { return err @@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error { return nil } -func writeCmd(cn *pool.Conn, cmds ...Cmder) error { - cn.Wb.Reset() +func writeCmd(wr *proto.Writer, cmds ...Cmder) error { for _, cmd := range cmds { - if err := cn.Wb.Append(cmd.Args()); err != nil { + err := wr.WriteArgs(cmd.Args()) + if err != nil { return err } } - - _, err := cn.Write(cn.Wb.Bytes()) - return err + return nil } func cmdString(cmd Cmder, val interface{}) string { @@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) { return cmd.val, cmd.err } -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } } -func (cmd *Cmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser) +func (cmd *Cmd) Int() (int, error) { if cmd.err != nil { - return cmd.err + return 0, cmd.err } - if b, ok := cmd.val.([]byte); ok { - // Bytes must be copied, because underlying memory is reused. - cmd.val = string(b) + switch val := cmd.val.(type) { + case int64: + return int(val), nil + case string: + return strconv.Atoi(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int", val) + return 0, err } - return nil +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err + } +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } +} + +func (cmd *Cmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) + return cmd.err +} + +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case string: + vals = append(vals, v) + default: + vals = append(vals, v) + } + } + return vals, nil } //------------------------------------------------------------------------------ @@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) readReply(cn *pool.Conn) error { +func (cmd *SliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(sliceParser) + v, cmd.err = rd.ReadArrayReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadStringReply() +func (cmd *StatusCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadIntReply() +func (cmd *IntCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() return cmd.err } @@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(cn *pool.Conn) error { +func (cmd *DurationCmd) readReply(rd *proto.Reader) error { var n int64 - n, cmd.err = cn.Rd.ReadIntReply() + n, cmd.err = rd.ReadIntReply() if cmd.err != nil { return cmd.err } @@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(cn *pool.Conn) error { +func (cmd *TimeCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(timeParser) + v, cmd.err = rd.ReadArrayReply(timeParser) if cmd.err != nil { return cmd.err } @@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + //------------------------------------------------------------------------------ type BoolCmd struct { @@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string { return cmdString(cmd, cmd.val) } -var ok = []byte("OK") - -func (cmd *BoolCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadReply(nil) + v, cmd.err = rd.ReadReply(nil) // `SET key value NX` returns nil when key already exists. But // `SETNX key value` returns bool (0/1). So convert nil to bool. // TODO: is this okay? @@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { case int64: cmd.val = v == 1 return nil - case []byte: - cmd.val = bytes.Equal(v, ok) + case string: + cmd.val = v == "OK" return nil default: cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) @@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { type StringCmd struct { baseCmd - val []byte + val string } var _ Cmder = (*StringCmd)(nil) @@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd { } func (cmd *StringCmd) Val() string { - return util.BytesToString(cmd.val) + return cmd.val } func (cmd *StringCmd) Result() (string, error) { @@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) { } func (cmd *StringCmd) Bytes() ([]byte, error) { - return cmd.val, cmd.err + return []byte(cmd.val), cmd.err +} + +func (cmd *StringCmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.Atoi(cmd.Val()) } func (cmd *StringCmd) Int64() (int64, error) { @@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error { if cmd.err != nil { return cmd.err } - return proto.Scan(cmd.val, val) + return proto.Scan([]byte(cmd.val), val) } func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadBytesReply() +func (cmd *StringCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() return cmd.err } @@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser) + v, cmd.err = rd.ReadArrayReply(stringSliceParser) if cmd.err != nil { return cmd.err } @@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadString() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + //------------------------------------------------------------------------------ type BoolSliceCmd struct { @@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser) + v, cmd.err = rd.ReadArrayReply(boolSliceParser) if cmd.err != nil { return cmd.err } @@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + //------------------------------------------------------------------------------ type StringStringMapCmd struct { @@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser) + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) if cmd.err != nil { return cmd.err } @@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ type StringIntMapCmd struct { @@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser) + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) if cmd.err != nil { return cmd.err } @@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { @@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser) + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) if cmd.err != nil { return cmd.err } @@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { return nil } -//------------------------------------------------------------------------------ +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } -type XStream struct { - Stream string - Messages []*XMessage + m[key] = struct{}{} + } + return m, nil } +//------------------------------------------------------------------------------ + type XMessage struct { ID string Values map[string]interface{} } +type XMessageSliceCmd struct { + baseCmd + + val []XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]XMessage, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ +type XStream struct { + Stream string + Messages []XMessage +} + type XStreamSliceCmd struct { baseCmd - val []*XStream + val []XStream } var _ Cmder = (*XStreamSliceCmd)(nil) @@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { } } -func (cmd *XStreamSliceCmd) Val() []*XStream { +func (cmd *XStreamSliceCmd) Val() []XStream { return cmd.val } -func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) { +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { return cmd.val, cmd.err } @@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XStream) + cmd.val = v.([]XStream) return nil } // Implements proto.MultiBulkParse func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - xx := make([]*XStream, n) + ret := make([]XStream, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xStreamParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) if err != nil { return nil, err } - xx[i] = v.(*XStream) } - return xx, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} - stream, err := rd.ReadStringReply() +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() if err != nil { return nil, err } - v, err := rd.ReadArrayReply(xMessageSliceParser) - if err != nil { + lower, err := rd.ReadString() + if err != nil && err != Nil { return nil, err } - return &XStream{ - Stream: stream, - Messages: v.([]*XMessage), - }, nil + higher, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumerPending, err := rd.ReadInt() + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil } //------------------------------------------------------------------------------ -type XMessageSliceCmd struct { - baseCmd +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} - val []*XMessage +type XPendingExtCmd struct { + baseCmd + val []XPendingExt } -var _ Cmder = (*XMessageSliceCmd)(nil) +var _ Cmder = (*XPendingExtCmd)(nil) -func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { - return &XMessageSliceCmd{ +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *XMessageSliceCmd) Val() []*XMessage { +func (cmd *XPendingExtCmd) Val() []XPendingExt { return cmd.val } -func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) { +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { return cmd.val, cmd.err } -func (cmd *XMessageSliceCmd) String() string { +func (cmd *XPendingExtCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { - var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser) +func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XMessage) + cmd.val = info.([]XPendingExt) return nil } -// Implements proto.MultiBulkParse -func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - msgs := make([]*XMessage, n) +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xMessageParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) if err != nil { return nil, err } - msgs[i] = v.(*XMessage) } - return msgs, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } +//------------------------------------------------------------------------------ - v, err := rd.ReadArrayReply(xKeyValueParser) - if err != nil { - return nil, err +//------------------------------------------------------------------------------ + +type ZSliceCmd struct { + baseCmd + + val []Z +} + +var _ Cmder = (*ZSliceCmd)(nil) + +func NewZSliceCmd(args ...interface{}) *ZSliceCmd { + return &ZSliceCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *ZSliceCmd) Val() []Z { + return cmd.val +} - return &XMessage{ - ID: id, - Values: v.(map[string]interface{}), - }, nil +func (cmd *ZSliceCmd) Result() ([]Z, error) { + return cmd.val, cmd.err +} + +func (cmd *ZSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]Z) + return nil } // Implements proto.MultiBulkParse -func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { - values := make(map[string]interface{}, n) +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadString() if err != nil { return nil, err } - value, err := rd.ReadStringReply() + z.Score, err = rd.ReadFloatReply() if err != nil { return nil, err } - - values[key] = value } - return values, nil + return zz, nil } //------------------------------------------------------------------------------ -type ZSliceCmd struct { +type ZWithKeyCmd struct { baseCmd - val []Z + val ZWithKey } -var _ Cmder = (*ZSliceCmd)(nil) +var _ Cmder = (*ZWithKeyCmd)(nil) -func NewZSliceCmd(args ...interface{}) *ZSliceCmd { - return &ZSliceCmd{ +func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { + return &ZWithKeyCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *ZSliceCmd) Val() []Z { +func (cmd *ZWithKeyCmd) Val() ZWithKey { return cmd.val } -func (cmd *ZSliceCmd) Result() ([]Z, error) { - return cmd.val, cmd.err +func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { + return cmd.Val(), cmd.Err() } -func (cmd *ZSliceCmd) String() string { +func (cmd *ZWithKeyCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser) + v, cmd.err = rd.ReadArrayReply(zWithKeyParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]Z) + cmd.val = v.(ZWithKey) return nil } +// Implements proto.MultiBulkParse +func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 3 { + return nil, fmt.Errorf("got %d elements, expected 3", n) + } + + var z ZWithKey + var err error + + z.Key, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + return z, nil +} + //------------------------------------------------------------------------------ type ScanCmd struct { @@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(cn *pool.Conn) error { - cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply() +func (cmd *ScanCmd) readReply(rd *proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() return cmd.err } @@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { +func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser) + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) if cmd.err != nil { return cmd.err } @@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadString() + if err != nil { + return nil, err + } + + port, err := rd.ReadString() + if err != nil { + return nil, err + } + + nodes[j].Addr = net.JoinHostPort(ip, port) + + if n == 3 { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + //------------------------------------------------------------------------------ // GeoLocation is used with GeoAdd to add geospatial location. @@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) if cmd.err != nil { return cmd.err } @@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { return nil } +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case string: + locs = append(locs, GeoLocation{ + Name: vv, + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + //------------------------------------------------------------------------------ type GeoPos struct { @@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.positions) } -func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser) + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) if cmd.err != nil { return cmd.err } @@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + //------------------------------------------------------------------------------ type CommandInfo struct { @@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { +func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser) + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) if cmd.err != nil { return cmd.err } @@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + //------------------------------------------------------------------------------ type cmdsInfoCache struct { diff --git a/src/dma/vendor/github.com/go-redis/redis/commands.go b/src/dma/vendor/github.com/go-redis/redis/commands.go index dddf8acd..653e4abe 100644 --- a/src/dma/vendor/github.com/go-redis/redis/commands.go +++ b/src/dma/vendor/github.com/go-redis/redis/commands.go @@ -8,13 +8,6 @@ import ( "github.com/go-redis/redis/internal" ) -func readTimeout(timeout time.Duration) time.Duration { - if timeout == 0 { - return 0 - } - return timeout + 10*time.Second -} - func usePrecise(dur time.Duration) bool { return dur < time.Second || dur%time.Second != 0 } @@ -172,16 +165,30 @@ type Cmdable interface { SRem(key string, members ...interface{}) *IntCmd SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd - XAdd(stream, id string, els map[string]interface{}) *StringCmd - XAddExt(opt *XAddExt) *StringCmd - XLen(key string) *IntCmd + XAdd(a *XAddArgs) *StringCmd + XDel(stream string, ids ...string) *IntCmd + XLen(stream string) *IntCmd XRange(stream, start, stop string) *XMessageSliceCmd XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd XRevRange(stream string, start, stop string) *XMessageSliceCmd XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd - XRead(streams ...string) *XStreamSliceCmd - XReadN(count int64, streams ...string) *XStreamSliceCmd - XReadExt(opt *XReadExt) *XStreamSliceCmd + XRead(a *XReadArgs) *XStreamSliceCmd + XReadStreams(streams ...string) *XStreamSliceCmd + XGroupCreate(stream, group, start string) *StatusCmd + XGroupCreateMkStream(stream, group, start string) *StatusCmd + XGroupSetID(stream, group, start string) *StatusCmd + XGroupDestroy(stream, group string) *IntCmd + XGroupDelConsumer(stream, group, consumer string) *IntCmd + XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd + XAck(stream, group string, ids ...string) *IntCmd + XPending(stream, group string) *XPendingCmd + XPendingExt(a *XPendingExtArgs) *XPendingExtCmd + XClaim(a *XClaimArgs) *XMessageSliceCmd + XClaimJustID(a *XClaimArgs) *StringSliceCmd + XTrim(key string, maxLen int64) *IntCmd + XTrimApprox(key string, maxLen int64) *IntCmd + BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd + BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -196,6 +203,8 @@ type Cmdable interface { ZLexCount(key, min, max string) *IntCmd ZIncrBy(key string, increment float64, member string) *FloatCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd + ZPopMax(key string, count ...int64) *ZSliceCmd + ZPopMin(key string, count ...int64) *ZSliceCmd ZRange(key string, start, stop int64) *StringSliceCmd ZRangeWithScores(key string, start, stop int64) *ZSliceCmd ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd @@ -223,6 +232,7 @@ type Cmdable interface { ClientKillByFilter(keys ...string) *IntCmd ClientList() *StringCmd ClientPause(dur time.Duration) *BoolCmd + ClientID() *IntCmd ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd @@ -260,6 +270,7 @@ type Cmdable interface { ClusterResetHard() *StatusCmd ClusterInfo() *StringCmd ClusterKeySlot(key string) *IntCmd + ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd ClusterCountFailureReports(nodeID string) *IntCmd ClusterCountKeysInSlot(slot int) *IntCmd ClusterDelSlots(slots ...int) *StatusCmd @@ -1300,7 +1311,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { //------------------------------------------------------------------------------ -type XAddExt struct { +type XAddArgs struct { Stream string MaxLen int64 // MAXLEN N MaxLenApprox int64 // MAXLEN ~ N @@ -1308,40 +1319,42 @@ type XAddExt struct { Values map[string]interface{} } -func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd { - a := make([]interface{}, 0, 6+len(opt.Values)*2) - a = append(a, "xadd") - a = append(a, opt.Stream) - if opt.MaxLen > 0 { - a = append(a, "maxlen", opt.MaxLen) - } else if opt.MaxLenApprox > 0 { - a = append(a, "maxlen", "~", opt.MaxLenApprox) +func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { + args := make([]interface{}, 0, 6+len(a.Values)*2) + args = append(args, "xadd") + args = append(args, a.Stream) + if a.MaxLen > 0 { + args = append(args, "maxlen", a.MaxLen) + } else if a.MaxLenApprox > 0 { + args = append(args, "maxlen", "~", a.MaxLenApprox) } - if opt.ID != "" { - a = append(a, opt.ID) + if a.ID != "" { + args = append(args, a.ID) } else { - a = append(a, "*") + args = append(args, "*") } - for k, v := range opt.Values { - a = append(a, k) - a = append(a, v) + for k, v := range a.Values { + args = append(args, k) + args = append(args, v) } - cmd := NewStringCmd(a...) + cmd := NewStringCmd(args...) c.process(cmd) return cmd } -func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd { - return c.XAddExt(&XAddExt{ - Stream: stream, - ID: id, - Values: values, - }) +func (c *cmdable) XDel(stream string, ids ...string) *IntCmd { + args := []interface{}{"xdel", stream} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd } -func (c *cmdable) XLen(key string) *IntCmd { - cmd := NewIntCmd("xlen", key) +func (c *cmdable) XLen(stream string) *IntCmd { + cmd := NewIntCmd("xlen", stream) c.process(cmd) return cmd } @@ -1370,55 +1383,190 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS return cmd } -type XReadExt struct { +type XReadArgs struct { Streams []string Count int64 Block time.Duration } -func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd { - a := make([]interface{}, 0, 5+len(opt.Streams)) - a = append(a, "xread") - if opt != nil { - if opt.Count > 0 { - a = append(a, "count") - a = append(a, opt.Count) - } - if opt.Block >= 0 { - a = append(a, "block") - a = append(a, int64(opt.Block/time.Millisecond)) - } +func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 5+len(a.Streams)) + args = append(args, "xread") + if a.Count > 0 { + args = append(args, "count") + args = append(args, a.Count) + } + if a.Block >= 0 { + args = append(args, "block") + args = append(args, int64(a.Block/time.Millisecond)) } - a = append(a, "streams") - for _, s := range opt.Streams { - a = append(a, s) + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) } - cmd := NewXStreamSliceCmd(a...) + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } c.process(cmd) return cmd } -func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ +func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { + return c.XRead(&XReadArgs{ Streams: streams, Block: -1, }) } -func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Count: count, - Block: -1, - }) +func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start) + c.process(cmd) + return cmd } -func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Block: block, - }) +func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "setid", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd { + cmd := NewIntCmd("xgroup", "destroy", stream, group) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { + cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) + c.process(cmd) + return cmd +} + +type XReadGroupArgs struct { + Group string + Consumer string + // List of streams and ids. + Streams []string + Count int64 + Block time.Duration + NoAck bool +} + +func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 8+len(a.Streams)) + args = append(args, "xreadgroup", "group", a.Group, a.Consumer) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + if a.Block >= 0 { + args = append(args, "block", int64(a.Block/time.Millisecond)) + } + if a.NoAck { + args = append(args, "noack") + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } + c.process(cmd) + return cmd +} + +func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd { + args := []interface{}{"xack", stream, group} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XPending(stream, group string) *XPendingCmd { + cmd := NewXPendingCmd("xpending", stream, group) + c.process(cmd) + return cmd +} + +type XPendingExtArgs struct { + Stream string + Group string + Start string + End string + Count int64 + Consumer string +} + +func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } + cmd := NewXPendingExtCmd(args...) + c.process(cmd) + return cmd +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { + args := xClaimArgs(a) + cmd := NewXMessageSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { + args := xClaimArgs(a) + args = append(args, "justid") + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func xClaimArgs(a *XClaimArgs) []interface{} { + args := make([]interface{}, 0, 4+len(a.Messages)) + args = append(args, + "xclaim", + a.Stream, + a.Group, a.Consumer, + int64(a.MinIdle/time.Millisecond)) + for _, id := range a.Messages { + args = append(args, id) + } + return args +} + +func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) + c.process(cmd) + return cmd +} + +func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) + c.process(cmd) + return cmd } //------------------------------------------------------------------------------ @@ -1429,6 +1577,12 @@ type Z struct { Member interface{} } +// ZWithKey represents sorted set member including the name of the key where it was popped. +type ZWithKey struct { + Z + Key string +} + // ZStore is used as an arg to ZInterStore and ZUnionStore. type ZStore struct { Weights []float64 @@ -1436,6 +1590,34 @@ type ZStore struct { Aggregate string } +// Redis `BZPOPMAX key [key ...] timeout` command. +func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmax" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +// Redis `BZPOPMIN key [key ...] timeout` command. +func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmin" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { for i, m := range members { a[n+2*i] = m.Score @@ -1574,6 +1756,46 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string) return cmd } +func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmax", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmin", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { args := []interface{}{ "zrange", @@ -1849,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd { return cmd } +func (c *cmdable) ClientID() *IntCmd { + cmd := NewIntCmd("client", "id") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblock(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id, "error") + c.process(cmd) + return cmd +} + // ClientSetName assigns a name to the connection. func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { cmd := NewBoolCmd("client", "setname", name) @@ -2164,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd { return cmd } +func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd { + cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count) + c.process(cmd) + return cmd +} + func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd { cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) c.process(cmd) diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go index e0ff8632..34f6bd4d 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/error.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go @@ -8,9 +8,18 @@ import ( "github.com/go-redis/redis/internal/proto" ) -func IsRetryableError(err error, retryNetError bool) bool { - if IsNetworkError(err) { - return retryNetError +func IsRetryableError(err error, retryTimeout bool) bool { + if err == nil { + return false + } + if err == io.EOF { + return true + } + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + return retryTimeout + } + return true } s := err.Error() if s == "ERR max number of clients reached" { @@ -33,20 +42,13 @@ func IsRedisError(err error) bool { return ok } -func IsNetworkError(err error) bool { - if err == io.EOF { - return true - } - _, ok := err.(net.Error) - return ok -} - func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false } if IsRedisError(err) { - return strings.HasPrefix(err.Error(), "READONLY ") + // #790 + return IsReadOnlyError(err) } if allowTimeout { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -81,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } + +func IsReadOnlyError(err error) bool { + return strings.HasPrefix(err.Error(), "READONLY ") +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go index acaf3665..1095bfe5 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -13,19 +13,21 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn - Rd *proto.Reader - Wb *proto.WriteBuffer + rd *proto.Reader + rdLocked bool + wr *proto.Writer - Inited bool - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, - Wb: proto.NewWriteBuffer(), } - cn.Rd = proto.NewReader(cn.netConn) + cn.rd = proto.NewReader(netConn) + cn.wr = proto.NewWriter(netConn) cn.SetUsedAt(time.Now()) return cn } @@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) { func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn - cn.Rd.Reset(netConn) + cn.rd.Reset(netConn) + cn.wr.Reset(netConn) } -func (cn *Conn) IsStale(timeout time.Duration) bool { - return timeout > 0 && time.Since(cn.UsedAt()) > timeout -} - -func (cn *Conn) SetReadTimeout(timeout time.Duration) { +func (cn *Conn) setReadTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetReadDeadline(now.Add(timeout)) - } else { - cn.netConn.SetReadDeadline(noDeadline) + return cn.netConn.SetReadDeadline(now.Add(timeout)) } + return cn.netConn.SetReadDeadline(noDeadline) } -func (cn *Conn) SetWriteTimeout(timeout time.Duration) { +func (cn *Conn) setWriteTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetWriteDeadline(now.Add(timeout)) - } else { - cn.netConn.SetWriteDeadline(noDeadline) + return cn.netConn.SetWriteDeadline(now.Add(timeout)) } + return cn.netConn.SetWriteDeadline(noDeadline) } func (cn *Conn) Write(b []byte) (int, error) { @@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { + _ = cn.setReadTimeout(timeout) + return fn(cn.rd) +} + +func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { + _ = cn.setWriteTimeout(timeout) + + firstErr := fn(cn.wr) + err := cn.wr.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + func (cn *Conn) Close() error { return cn.netConn.Close() } diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go index cab66904..9cecee8a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,6 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // deprecated - use IdleConns IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -53,6 +52,8 @@ type Options struct { OnClose func(*Conn) error PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -63,16 +64,16 @@ type ConnPool struct { dialErrorsNum uint32 // atomic - lastDialError error lastDialErrorMu sync.RWMutex + lastDialError error queue chan struct{} - connsMu sync.Mutex - conns []*Conn - - idleConnsMu sync.RWMutex - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int stats Stats @@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool { idleConns: make([]*Conn, 0, opt.PoolSize), } + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool { return p } +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + func (p *ConnPool) NewConn() (*Conn, error) { - cn, err := p.newConn() + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) newConn() (*Conn, error) { +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) { return nil, err } - return NewConn(netConn), nil + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil } func (p *ConnPool) tryDial() { @@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) { } for { - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.popIdle() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn == nil { break } - if cn.IsStale(p.opt.IdleTimeout) { - p.CloseConn(cn) + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) continue } @@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.NewConn() + newcn, err := p._NewConn(true) if err != nil { p.freeTurn() return nil, err @@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - + p.idleConnsLen-- + p.checkMinIdleConns() return cn } func (p *ConnPool) Put(cn *Conn) { - buf := cn.Rd.PeekBuffered() - if buf != nil { - internal.Logf("connection has unread data: %.100q", buf) + if !cn.pooled { p.Remove(cn) return } - p.idleConnsMu.Lock() + p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) - p.idleConnsMu.Unlock() + p.idleConnsLen++ + p.connsMu.Unlock() p.freeTurn() } @@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } break } } @@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - l := len(p.conns) + n := len(p.conns) p.connsMu.Unlock() - return l + return n } -// FreeLen returns number of idle connections. +// IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { - p.idleConnsMu.RLock() - l := len(p.idleConns) - p.idleConnsMu.RUnlock() - return l + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n } func (p *ConnPool) Stats() *Stats { @@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats { Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(idleLen), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } @@ -349,11 +393,10 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.connsMu.Unlock() - - p.idleConnsMu.Lock() + p.poolSize = 0 p.idleConns = nil - p.idleConnsMu.Unlock() + p.idleConnsLen = 0 + p.connsMu.Unlock() return firstErr } @@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn { } cn := p.idleConns[0] - if !cn.IsStale(p.opt.IdleTimeout) { + if !p.isStaleConn(cn) { return nil } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- return cn } @@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) { for { p.getTurn() - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.reapStaleConn() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn != nil { p.removeConn(cn) @@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) { atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go index 8c28c7b7..896b6f65 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -9,8 +9,6 @@ import ( "github.com/go-redis/redis/internal/util" ) -const bytesAllocLimit = 1024 * 1024 // 1mb - const ( ErrorReply = '-' StatusReply = '+' @@ -32,40 +30,23 @@ func (e RedisError) Error() string { return string(e) } type MultiBulkParse func(*Reader, int64) (interface{}, error) type Reader struct { - src *bufio.Reader - buf []byte + rd *bufio.Reader + _buf []byte } func NewReader(rd io.Reader) *Reader { return &Reader{ - src: bufio.NewReader(rd), - buf: make([]byte, 4096), + rd: bufio.NewReader(rd), + _buf: make([]byte, 64), } } func (r *Reader) Reset(rd io.Reader) { - r.src.Reset(rd) -} - -func (r *Reader) PeekBuffered() []byte { - if n := r.src.Buffered(); n != 0 { - b, _ := r.src.Peek(n) - return b - } - return nil -} - -func (r *Reader) ReadN(n int) ([]byte, error) { - b, err := readN(r.src, r.buf, n) - if err != nil { - return nil, err - } - r.buf = b - return b, nil + r.rd.Reset(rd) } func (r *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := r.src.ReadLine() + line, isPrefix, err := r.rd.ReadLine() if err != nil { return nil, err } @@ -91,11 +72,11 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { case ErrorReply: return nil, ParseErrorReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil case IntReply: return util.ParseInt(line[1:], 10, 64) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case ArrayReply: n, err := parseArrayLen(line) if err != nil { @@ -121,47 +102,42 @@ func (r *Reader) ReadIntReply() (int64, error) { } } -func (r *Reader) ReadTmpBytesReply() ([]byte, error) { +func (r *Reader) ReadString() (string, error) { line, err := r.ReadLine() if err != nil { - return nil, err + return "", err } switch line[0] { case ErrorReply: - return nil, ParseErrorReply(line) + return "", ParseErrorReply(line) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil + case IntReply: + return string(line[1:]), nil default: - return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) + return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line) } } -func (r *Reader) ReadBytesReply() ([]byte, error) { - b, err := r.ReadTmpBytesReply() - if err != nil { - return nil, err +func (r *Reader) readStringReply(line []byte) (string, error) { + if isNilReply(line) { + return "", Nil } - cp := make([]byte, len(b)) - copy(cp, b) - return cp, nil -} -func (r *Reader) ReadStringReply() (string, error) { - b, err := r.ReadTmpBytesReply() + replyLen, err := strconv.Atoi(string(line[1:])) if err != nil { return "", err } - return string(b), nil -} -func (r *Reader) ReadFloatReply() (float64, error) { - b, err := r.ReadTmpBytesReply() + b := make([]byte, replyLen+2) + _, err = io.ReadFull(r.rd, b) if err != nil { - return 0, err + return "", err } - return util.ParseFloat(b, 64) + + return util.BytesToString(b[:replyLen]), nil } func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { @@ -219,7 +195,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { keys := make([]string, n) for i := int64(0); i < n; i++ { - key, err := r.ReadStringReply() + key, err := r.ReadString() if err != nil { return nil, 0, err } @@ -229,69 +205,71 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { return keys, cursor, err } -func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) { - if isNilReply(line) { - return nil, Nil - } - - replyLen, err := strconv.Atoi(string(line[1:])) +func (r *Reader) ReadInt() (int64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } + return util.ParseInt(b, 10, 64) +} - b, err := r.ReadN(replyLen + 2) +func (r *Reader) ReadUint() (uint64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } - return b[:replyLen], nil + return util.ParseUint(b, 10, 64) } -func (r *Reader) ReadInt() (int64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.readTmpBytesReply() if err != nil { return 0, err } - return util.ParseInt(b, 10, 64) + return util.ParseFloat(b, 64) } -func (r *Reader) ReadUint() (uint64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) readTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() if err != nil { - return 0, err + return nil, err + } + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case StringReply: + return r._readTmpBytesReply(line) + case StatusReply: + return line[1:], nil + default: + return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) } - return util.ParseUint(b, 10, 64) } -// -------------------------------------------------------------------- +func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { + if isNilReply(line) { + return nil, Nil + } -func readN(r io.Reader, b []byte, n int) ([]byte, error) { - if n == 0 && b == nil { - return make([]byte, 0), nil + replyLen, err := strconv.Atoi(string(line[1:])) + if err != nil { + return nil, err } - if cap(b) >= n { - b = b[:n] - _, err := io.ReadFull(r, b) - return b, err + buf := r.buf(replyLen + 2) + _, err = io.ReadFull(r.rd, buf) + if err != nil { + return nil, err } - b = b[:cap(b)] - pos := 0 - for pos < n { - diff := n - len(b) - if diff > bytesAllocLimit { - diff = bytesAllocLimit - } - b = append(b, make([]byte, diff)...) + return buf[:replyLen], nil +} - nn, err := io.ReadFull(r, b[pos:]) - if err != nil { - return nil, err - } - pos += nn +func (r *Reader) buf(n int) []byte { + if d := n - cap(r._buf); d > 0 { + r._buf = append(r._buf, make([]byte, d)...) } - - return b, nil + return r._buf[:n] } func isNilReply(b []byte) bool { @@ -304,10 +282,6 @@ func ParseErrorReply(line []byte) error { return RedisError(string(line[1:])) } -func parseTmpStatusReply(line []byte) []byte { - return line[1:] -} - func parseArrayLen(line []byte) (int64, error) { if isNilReply(line) { return 0, Nil diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go deleted file mode 100644 index 664f4c33..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go +++ /dev/null @@ -1,113 +0,0 @@ -package proto - -import ( - "encoding" - "fmt" - "strconv" -) - -type WriteBuffer struct { - b []byte -} - -func NewWriteBuffer() *WriteBuffer { - return &WriteBuffer{ - b: make([]byte, 0, 4096), - } -} - -func (w *WriteBuffer) Len() int { return len(w.b) } -func (w *WriteBuffer) Bytes() []byte { return w.b } -func (w *WriteBuffer) Reset() { w.b = w.b[:0] } - -func (w *WriteBuffer) Append(args []interface{}) error { - w.b = append(w.b, ArrayReply) - w.b = strconv.AppendUint(w.b, uint64(len(args)), 10) - w.b = append(w.b, '\r', '\n') - - for _, arg := range args { - if err := w.append(arg); err != nil { - return err - } - } - return nil -} - -func (w *WriteBuffer) append(val interface{}) error { - switch v := val.(type) { - case nil: - w.AppendString("") - case string: - w.AppendString(v) - case []byte: - w.AppendBytes(v) - case int: - w.AppendString(formatInt(int64(v))) - case int8: - w.AppendString(formatInt(int64(v))) - case int16: - w.AppendString(formatInt(int64(v))) - case int32: - w.AppendString(formatInt(int64(v))) - case int64: - w.AppendString(formatInt(v)) - case uint: - w.AppendString(formatUint(uint64(v))) - case uint8: - w.AppendString(formatUint(uint64(v))) - case uint16: - w.AppendString(formatUint(uint64(v))) - case uint32: - w.AppendString(formatUint(uint64(v))) - case uint64: - w.AppendString(formatUint(v)) - case float32: - w.AppendString(formatFloat(float64(v))) - case float64: - w.AppendString(formatFloat(v)) - case bool: - if v { - w.AppendString("1") - } else { - w.AppendString("0") - } - case encoding.BinaryMarshaler: - b, err := v.MarshalBinary() - if err != nil { - return err - } - w.AppendBytes(b) - default: - return fmt.Errorf( - "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val) - } - return nil -} - -func (w *WriteBuffer) AppendString(s string) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(s)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, s...) - w.b = append(w.b, '\r', '\n') -} - -func (w *WriteBuffer) AppendBytes(p []byte) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(p)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, p...) - w.b = append(w.b, '\r', '\n') -} - -func formatInt(n int64) string { - return strconv.FormatInt(n, 10) -} - -func formatUint(u uint64) string { - return strconv.FormatUint(u, 10) -} - -func formatFloat(f float64) string { - return strconv.FormatFloat(f, 'f', -1, 64) -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go new file mode 100644 index 00000000..d106ce0e --- /dev/null +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go @@ -0,0 +1,159 @@ +package proto + +import ( + "bufio" + "encoding" + "fmt" + "io" + "strconv" + + "github.com/go-redis/redis/internal/util" +) + +type Writer struct { + wr *bufio.Writer + + lenBuf []byte + numBuf []byte +} + +func NewWriter(wr io.Writer) *Writer { + return &Writer{ + wr: bufio.NewWriter(wr), + + lenBuf: make([]byte, 64), + numBuf: make([]byte, 64), + } +} + +func (w *Writer) WriteArgs(args []interface{}) error { + err := w.wr.WriteByte(ArrayReply) + if err != nil { + return err + } + + err = w.writeLen(len(args)) + if err != nil { + return err + } + + for _, arg := range args { + err := w.writeArg(arg) + if err != nil { + return err + } + } + + return nil +} + +func (w *Writer) writeLen(n int) error { + w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) + w.lenBuf = append(w.lenBuf, '\r', '\n') + _, err := w.wr.Write(w.lenBuf) + return err +} + +func (w *Writer) writeArg(v interface{}) error { + switch v := v.(type) { + case nil: + return w.string("") + case string: + return w.string(v) + case []byte: + return w.bytes(v) + case int: + return w.int(int64(v)) + case int8: + return w.int(int64(v)) + case int16: + return w.int(int64(v)) + case int32: + return w.int(int64(v)) + case int64: + return w.int(v) + case uint: + return w.uint(uint64(v)) + case uint8: + return w.uint(uint64(v)) + case uint16: + return w.uint(uint64(v)) + case uint32: + return w.uint(uint64(v)) + case uint64: + return w.uint(v) + case float32: + return w.float(float64(v)) + case float64: + return w.float(v) + case bool: + if v { + return w.int(1) + } else { + return w.int(0) + } + case encoding.BinaryMarshaler: + b, err := v.MarshalBinary() + if err != nil { + return err + } + return w.bytes(b) + default: + return fmt.Errorf( + "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v) + } +} + +func (w *Writer) bytes(b []byte) error { + err := w.wr.WriteByte(StringReply) + if err != nil { + return err + } + + err = w.writeLen(len(b)) + if err != nil { + return err + } + + _, err = w.wr.Write(b) + if err != nil { + return err + } + + return w.crlf() +} + +func (w *Writer) string(s string) error { + return w.bytes(util.StringToBytes(s)) +} + +func (w *Writer) uint(n uint64) error { + w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) int(n int64) error { + w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) float(f float64) error { + w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64) + return w.bytes(w.numBuf) +} + +func (w *Writer) crlf() error { + err := w.wr.WriteByte('\r') + if err != nil { + return err + } + return w.wr.WriteByte('\n') +} + +func (w *Writer) Reset(wr io.Writer) { + w.wr.Reset(wr) +} + +func (w *Writer) Flush() error { + return w.wr.Flush() +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go deleted file mode 100644 index 3b174172..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight - -import "sync" - -// call is an in-flight or completed Do call -type call struct { - wg sync.WaitGroup - val interface{} - err error -} - -// Group represents a class of work and forms a namespace in which -// units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - g.mu.Unlock() - c.wg.Wait() - return c.val, c.err - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() - - return c.val, c.err -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go index cd891833..1b3060eb 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go @@ -5,3 +5,7 @@ package util func BytesToString(b []byte) string { return string(b) } + +func StringToBytes(s string) []byte { + return []byte(s) +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go index 93a89c55..c9868aac 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go @@ -10,3 +10,13 @@ import ( func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } + +// StringToBytes converts string to byte slice. +func StringToBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +} diff --git a/src/dma/vendor/github.com/go-redis/redis/options.go b/src/dma/vendor/github.com/go-redis/redis/options.go index 8a82d590..b6fabf3f 100644 --- a/src/dma/vendor/github.com/go-redis/redis/options.go +++ b/src/dma/vendor/github.com/go-redis/redis/options.go @@ -14,6 +14,17 @@ import ( "github.com/go-redis/redis/internal/pool" ) +// Limiter is the interface of a rate limiter or a circuit breaker. +type Limiter interface { + // Allow returns a nil if operation is allowed or an error otherwise. + // If operation is allowed client must report the result of operation + // whether is a success or a failure. + Allow() error + // ReportResult reports the result of previously allowed operation. + // nil indicates a success, non-nil error indicates a failure. + ReportResult(result error) +} + type Options struct { // The network type, either tcp or unix. // Default is tcp. @@ -48,7 +59,7 @@ type Options struct { // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail - // with a timeout instead of blocking. + // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. If reached, commands will fail @@ -59,6 +70,12 @@ type Options struct { // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int + // Connection age at which client retires (closes) the connection. + // Default is to not close aged connections. + MaxConnAge time.Duration // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. @@ -69,7 +86,8 @@ type Options struct { IdleTimeout time.Duration // Frequency of idle checks made by idle connections reaper. // Default is 1 minute. -1 disables idle connections reaper, - // but idle connections are still discarded by the client. + // but idle connections are still discarded by the client + // if IdleTimeout is set. IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. @@ -83,6 +101,9 @@ func (opt *Options) init() { if opt.Network == "" { opt.Network = "tcp" } + if opt.Addr == "" { + opt.Addr = "localhost:6379" + } if opt.Dialer == nil { opt.Dialer = func() (net.Conn, error) { netDialer := &net.Dialer{ @@ -196,6 +217,8 @@ func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{ Dialer: opt.Dialer, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, diff --git a/src/dma/vendor/github.com/go-redis/redis/parser.go b/src/dma/vendor/github.com/go-redis/redis/parser.go deleted file mode 100644 index f0dc67f0..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/parser.go +++ /dev/null @@ -1,394 +0,0 @@ -package redis - -import ( - "fmt" - "net" - "strconv" - "time" - - "github.com/go-redis/redis/internal/proto" -) - -// Implements proto.MultiBulkParse -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { - vals := make([]interface{}, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(sliceParser) - if err != nil { - if err == Nil { - vals = append(vals, nil) - continue - } - if err, ok := err.(proto.RedisError); ok { - vals = append(vals, err) - continue - } - return nil, err - } - - switch v := v.(type) { - case []byte: - vals = append(vals, string(v)) - default: - vals = append(vals, v) - } - } - return vals, nil -} - -// Implements proto.MultiBulkParse -func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - bools := make([]bool, 0, n) - for i := int64(0); i < n; i++ { - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - bools = append(bools, n == 1) - } - return bools, nil -} - -// Implements proto.MultiBulkParse -func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - ss := make([]string, 0, n) - for i := int64(0); i < n; i++ { - s, err := rd.ReadStringReply() - if err == Nil { - ss = append(ss, "") - } else if err != nil { - return nil, err - } else { - ss = append(ss, s) - } - } - return ss, nil -} - -// Implements proto.MultiBulkParse -func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]string, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - value, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = value - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]int64, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - m[key] = n - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]struct{}, n) - for i := int64(0); i < n; i++ { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = struct{}{} - } - return m, nil -} - -// Implements proto.MultiBulkParse -func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - zz := make([]Z, n/2) - for i := int64(0); i < n; i += 2 { - var err error - - z := &zz[i/2] - - z.Member, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - z.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - return zz, nil -} - -// Implements proto.MultiBulkParse -func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { - slots := make([]ClusterSlot, n) - for i := 0; i < len(slots); i++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n < 2 { - err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) - return nil, err - } - - start, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - end, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - nodes := make([]ClusterNode, n-2) - for j := 0; j < len(nodes); j++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 && n != 3 { - err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) - return nil, err - } - - ip, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - port, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) - - if n == 3 { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - nodes[j].Id = id - } - } - - slots[i] = ClusterSlot{ - Start: int(start), - End: int(end), - Nodes: nodes, - } - } - return slots, nil -} - -func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - var loc GeoLocation - var err error - - loc.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - if q.WithDist { - loc.Dist, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - if q.WithGeoHash { - loc.GeoHash, err = rd.ReadIntReply() - if err != nil { - return nil, err - } - } - if q.WithCoord { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 { - return nil, fmt.Errorf("got %d coordinates, expected 2", n) - } - - loc.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - loc.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - - return &loc, nil - } -} - -func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - locs := make([]GeoLocation, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(newGeoLocationParser(q)) - if err != nil { - return nil, err - } - switch vv := v.(type) { - case []byte: - locs = append(locs, GeoLocation{ - Name: string(vv), - }) - case *GeoLocation: - locs = append(locs, *vv) - default: - return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) - } - } - return locs, nil - } -} - -func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { - var pos GeoPos - var err error - - pos.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - pos.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - return &pos, nil -} - -func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - positions := make([]*GeoPos, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(geoPosParser) - if err != nil { - if err == Nil { - positions = append(positions, nil) - continue - } - return nil, err - } - switch v := v.(type) { - case *GeoPos: - positions = append(positions, v) - default: - return nil, fmt.Errorf("got %T, expected *GeoPos", v) - } - } - return positions, nil -} - -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - var cmd CommandInfo - var err error - - if n != 6 { - return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) - } - - cmd.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - arity, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.Arity = int8(arity) - - flags, err := rd.ReadReply(stringSliceParser) - if err != nil { - return nil, err - } - cmd.Flags = flags.([]string) - - firstKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.FirstKeyPos = int8(firstKeyPos) - - lastKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.LastKeyPos = int8(lastKeyPos) - - stepCount, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.StepCount = int8(stepCount) - - for _, flag := range cmd.Flags { - if flag == "readonly" { - cmd.ReadOnly = true - break - } - } - - return &cmd, nil -} - -// Implements proto.MultiBulkParse -func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]*CommandInfo, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(commandInfoParser) - if err != nil { - return nil, err - } - vv := v.(*CommandInfo) - m[vv.Name] = vv - - } - return m, nil -} - -// Implements proto.MultiBulkParse -func timeParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d elements, expected 2", n) - } - - sec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - microsec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - return time.Unix(sec, microsec*1000), nil -} diff --git a/src/dma/vendor/github.com/go-redis/redis/pipeline.go b/src/dma/vendor/github.com/go-redis/redis/pipeline.go index ba852283..b3a8844a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/pipeline.go +++ b/src/dma/vendor/github.com/go-redis/redis/pipeline.go @@ -10,6 +10,7 @@ type pipelineExecer func([]Cmder) error type Pipeliner interface { StatefulCmdable + Do(args ...interface{}) *Cmd Process(cmd Cmder) error Close() error Discard() error @@ -31,6 +32,12 @@ type Pipeline struct { closed bool } +func (c *Pipeline) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + // Process queues the cmd for later execution. func (c *Pipeline) Process(cmd Cmder) error { c.mu.Lock() diff --git a/src/dma/vendor/github.com/go-redis/redis/pubsub.go b/src/dma/vendor/github.com/go-redis/redis/pubsub.go index 2cfcd150..0afb47cd 100644 --- a/src/dma/vendor/github.com/go-redis/redis/pubsub.go +++ b/src/dma/vendor/github.com/go-redis/redis/pubsub.go @@ -1,15 +1,19 @@ package redis import ( + "errors" "fmt" "sync" "time" "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" ) -// PubSub implements Pub/Sub commands as described in +var errPingTimeout = errors.New("redis: ping timeout") + +// PubSub implements Pub/Sub commands bas described in // http://redis.io/topics/pubsub. Message receiving is NOT safe // for concurrent use by multiple goroutines. // @@ -46,15 +50,17 @@ func (c *PubSub) conn() (*pool.Conn, error) { return cn, err } -func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { +func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } - if c.cn != nil { return c.cn, nil } + channels := mapKeys(c.channels) + channels = append(channels, newChannels...) + cn, err := c.newConn(channels) if err != nil { return nil, err @@ -69,20 +75,24 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { return cn, nil } +func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { + return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) +} + func (c *PubSub) resubscribe(cn *pool.Conn) error { var firstErr error if len(c.channels) > 0 { - channels := mapKeys(c.channels) - err := c._subscribe(cn, "subscribe", channels...) + err := c._subscribe(cn, "subscribe", mapKeys(c.channels)) if err != nil && firstErr == nil { firstErr = err } } if len(c.patterns) > 0 { - patterns := mapKeys(c.patterns) - err := c._subscribe(cn, "psubscribe", patterns...) + err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns)) if err != nil && firstErr == nil { firstErr = err } @@ -101,51 +111,48 @@ func mapKeys(m map[string]struct{}) []string { return s } -func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error { - args := make([]interface{}, 1+len(channels)) - args[0] = redisCmd - for i, channel := range channels { - args[1+i] = channel +func (c *PubSub) _subscribe( + cn *pool.Conn, redisCmd string, channels []string, +) error { + args := make([]interface{}, 0, 1+len(channels)) + args = append(args, redisCmd) + for _, channel := range channels { + args = append(args, channel) } cmd := NewSliceCmd(args...) - - cn.SetWriteTimeout(c.opt.WriteTimeout) - return writeCmd(cn, cmd) + return c.writeCmd(cn, cmd) } -func (c *PubSub) releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) { c.mu.Lock() - c._releaseConn(cn, err) + c._releaseConn(cn, err, allowTimeout) c.mu.Unlock() } -func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) { if c.cn != cn { return } - if internal.IsBadConn(err, true) { - c._reconnect() + if internal.IsBadConn(err, allowTimeout) { + c._reconnect(err) } } -func (c *PubSub) _closeTheCn() error { - var err error - if c.cn != nil { - err = c.closeConn(c.cn) - c.cn = nil - } - return err -} - -func (c *PubSub) reconnect() { - c.mu.Lock() - c._reconnect() - c.mu.Unlock() +func (c *PubSub) _reconnect(reason error) { + _ = c._closeTheCn(reason) + _, _ = c._conn(nil) } -func (c *PubSub) _reconnect() { - _ = c._closeTheCn() - _, _ = c._conn(nil) +func (c *PubSub) _closeTheCn(reason error) error { + if c.cn == nil { + return nil + } + if !c.closed { + internal.Logf("redis: discarding bad PubSub connection: %s", reason) + } + err := c.closeConn(c.cn) + c.cn = nil + return err } func (c *PubSub) Close() error { @@ -158,7 +165,7 @@ func (c *PubSub) Close() error { c.closed = true close(c.exit) - err := c._closeTheCn() + err := c._closeTheCn(pool.ErrClosed) return err } @@ -172,8 +179,8 @@ func (c *PubSub) Subscribe(channels ...string) error { if c.channels == nil { c.channels = make(map[string]struct{}) } - for _, channel := range channels { - c.channels[channel] = struct{}{} + for _, s := range channels { + c.channels[s] = struct{}{} } return err } @@ -188,8 +195,8 @@ func (c *PubSub) PSubscribe(patterns ...string) error { if c.patterns == nil { c.patterns = make(map[string]struct{}) } - for _, pattern := range patterns { - c.patterns[pattern] = struct{}{} + for _, s := range patterns { + c.patterns[s] = struct{}{} } return err } @@ -200,10 +207,10 @@ func (c *PubSub) Unsubscribe(channels ...string) error { c.mu.Lock() defer c.mu.Unlock() - err := c.subscribe("unsubscribe", channels...) for _, channel := range channels { delete(c.channels, channel) } + err := c.subscribe("unsubscribe", channels...) return err } @@ -213,10 +220,10 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error { c.mu.Lock() defer c.mu.Unlock() - err := c.subscribe("punsubscribe", patterns...) for _, pattern := range patterns { delete(c.patterns, pattern) } + err := c.subscribe("punsubscribe", patterns...) return err } @@ -226,8 +233,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error { return err } - err = c._subscribe(cn, redisCmd, channels...) - c._releaseConn(cn, err) + err = c._subscribe(cn, redisCmd, channels) + c._releaseConn(cn, err, false) return err } @@ -243,9 +250,8 @@ func (c *PubSub) Ping(payload ...string) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - err = writeCmd(cn, cmd) - c.releaseConn(cn, err) + err = c.writeCmd(cn, cmd) + c.releaseConn(cn, err, false) return err } @@ -336,9 +342,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { return nil, err } - cn.SetReadTimeout(timeout) - err = c.cmd.readReply(cn) - c.releaseConn(cn, err) + err = cn.WithReader(timeout, func(rd *proto.Reader) error { + return c.cmd.readReply(rd) + }) + + c.releaseConn(cn, err, timeout > 0) if err != nil { return nil, err } @@ -432,21 +440,26 @@ func (c *PubSub) initChannel() { timer := time.NewTimer(timeout) timer.Stop() - var hasPing bool + healthy := true for { timer.Reset(timeout) select { case <-c.ping: - hasPing = true + healthy = true if !timer.Stop() { <-timer.C } case <-timer.C: - if hasPing { - hasPing = false - _ = c.Ping() + pingErr := c.Ping() + if healthy { + healthy = false } else { - c.reconnect() + if pingErr == nil { + pingErr = errPingTimeout + } + c.mu.Lock() + c._reconnect(pingErr) + c.mu.Unlock() } case <-c.exit: return diff --git a/src/dma/vendor/github.com/go-redis/redis/redis.go b/src/dma/vendor/github.com/go-redis/redis/redis.go index c0f142cc..aca30648 100644 --- a/src/dma/vendor/github.com/go-redis/redis/redis.go +++ b/src/dma/vendor/github.com/go-redis/redis/redis.go @@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) { type baseClient struct { opt *Options connPool pool.Pooler + limiter Limiter process func(Cmder) error processPipeline func([]Cmder) error @@ -50,7 +51,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { if err := c.initConn(cn); err != nil { _ = c.connPool.CloseConn(cn) return nil, err @@ -61,12 +62,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) { } func (c *baseClient) getConn() (*pool.Conn, error) { + if c.limiter != nil { + err := c.limiter.Allow() + if err != nil { + return nil, err + } + } + + cn, err := c._getConn() + if err != nil { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + return nil, err + } + return cn, nil +} + +func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { err := c.initConn(cn) if err != nil { c.connPool.Remove(cn) @@ -77,18 +96,32 @@ func (c *baseClient) getConn() (*pool.Conn, error) { return cn, nil } -func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { +func (c *baseClient) releaseConn(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if internal.IsBadConn(err, false) { c.connPool.Remove(cn) - return false + } else { + c.connPool.Put(cn) + } +} + +func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) } - c.connPool.Put(cn) - return true + if err == nil || internal.IsRedisError(err) { + c.connPool.Put(cn) + } else { + c.connPool.Remove(cn) + } } func (c *baseClient) initConn(cn *pool.Conn) error { - cn.Inited = true + cn.InitedAt = time.Now() if c.opt.Password == "" && c.opt.DB == 0 && @@ -123,8 +156,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } +// Do creates a Cmd from the args and processes the cmd. +func (c *baseClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + // WrapProcess wraps function that processes Redis commands. -func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +func (c *baseClient) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.process = fn(c.process) } @@ -147,8 +189,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmd); err != nil { + err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) + if err != nil { c.releaseConn(cn, err) cmd.setErr(err) if internal.IsRetryableError(err, true) { @@ -157,8 +201,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetReadTimeout(c.cmdTimeout(cmd)) - err = cmd.readReply(cn) + err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { + return cmd.readReply(rd) + }) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -176,7 +221,11 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { - return readTimeout(*timeout) + t := *timeout + if t == 0 { + return 0 + } + return t + 10*time.Second } return c.opt.ReadTimeout } @@ -232,35 +281,33 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e } canRetry, err := p(cn, cmds) - - if err == nil || internal.IsRedisError(err) { - c.connPool.Put(cn) - break - } - c.connPool.Remove(cn) + c.releaseConnStrict(cn, err) if !canRetry || !internal.IsRetryableError(err, true) { break } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmds...); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - return true, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return pipelineReadCmds(rd, cmds) + }) + return true, err } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { +func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } @@ -269,47 +316,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { } func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds); err != nil { - setCmdsErr(cmds, err) - return false, err - } - - return false, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := txPipelineReadQueued(rd, cmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return false, err } -func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { +func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error { multiExec := make([]Cmder, 0, len(cmds)+2) multiExec = append(multiExec, NewStatusCmd("MULTI")) multiExec = append(multiExec, cmds...) multiExec = append(multiExec, NewSliceCmd("EXEC")) - return writeCmd(cn, multiExec...) + return writeCmd(wr, multiExec...) } -func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { +func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + err := statusCmd.readReply(rd) + if err != nil { return err } - for _ = range cmds { - err := statusCmd.readReply(cn) + for range cmds { + err = statusCmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -373,12 +423,12 @@ func (c *Client) WithContext(ctx context.Context) *Client { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *Client) copy() *Client { +func (c *Client) clone() *Client { cp := *c cp.init() return &cp @@ -389,6 +439,11 @@ func (c *Client) Options() *Options { return c.opt } +func (c *Client) SetLimiter(l Limiter) *Client { + c.limiter = l + return c +} + type PoolStats pool.Stats // PoolStats returns connection pool stats. @@ -437,6 +492,30 @@ func (c *Client) pubSub() *PubSub { // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. +// Note that this method does not wait on a response from Redis, so the +// subscription may not be active immediately. To force the connection to wait, +// you may call the Receive() method on the returned *PubSub like so: +// +// sub := client.Subscribe(queryResp) +// iface, err := sub.Receive() +// if err != nil { +// // handle error +// } +// +// // Should be *Subscription, but others are possible if other actions have been +// // taken on sub since it was created. +// switch iface.(type) { +// case *Subscription: +// // subscribe succeeded +// case *Message: +// // received first message +// case *Pong: +// // pong received +// default: +// // handle error +// } +// +// ch := sub.Channel() func (c *Client) Subscribe(channels ...string) *PubSub { pubsub := c.pubSub() if len(channels) > 0 { diff --git a/src/dma/vendor/github.com/go-redis/redis/result.go b/src/dma/vendor/github.com/go-redis/redis/result.go index e086e8e3..e438f260 100644 --- a/src/dma/vendor/github.com/go-redis/redis/result.go +++ b/src/dma/vendor/github.com/go-redis/redis/result.go @@ -53,7 +53,7 @@ func NewBoolResult(val bool, err error) *BoolCmd { // NewStringResult returns a StringCmd initialised with val and err for testing func NewStringResult(val string, err error) *StringCmd { var cmd StringCmd - cmd.val = []byte(val) + cmd.val = val cmd.setErr(err) return &cmd } diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go index ef855115..250e5f64 100644 --- a/src/dma/vendor/github.com/go-redis/redis/ring.go +++ b/src/dma/vendor/github.com/go-redis/redis/ring.go @@ -68,6 +68,8 @@ type RingOptions struct { WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, @@ -315,12 +319,12 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ -// Ring is a Redis client that uses constistent hashing to distribute +// Ring is a Redis client that uses consistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from -// the ring. When shard comes online it is added back to the ring. This +// the ring. When a shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any @@ -338,6 +342,7 @@ type Ring struct { shards *ringShards cmdsInfoCache *cmdsInfoCache + process func(Cmder) error processPipeline func([]Cmder) error } @@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring { } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats { acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns } return &acc } @@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shards.GetByKey(firstKey) } -func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.ForEachShard(func(c *Client) error { - c.WrapProcess(fn) - return nil - }) +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { + c.process = fn(c.process) } func (c *Ring) Process(cmd Cmder) error { - shard, err := c.cmdShard(cmd) - if err != nil { - cmd.setErr(err) - return err + return c.process(cmd) +} + +func (c *Ring) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err + } + + err = shard.Client.Process(cmd) + if err == nil { + return nil + } + if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { + return err + } } - return shard.Client.Process(cmd) + return cmd.Err() } func (c *Ring) Pipeline() Pipeliner { @@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } + var mu sync.Mutex var failedCmdsMap map[string][]Cmder + var wg sync.WaitGroup for hash, cmds := range cmdsMap { - shard, err := c.shards.GetByHash(hash) - if err != nil { - setCmdsErr(cmds, err) - continue - } + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() + + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + return + } - cn, err := shard.Client.getConn() - if err != nil { - setCmdsErr(cmds, err) - continue - } + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + return + } - canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - if err == nil || internal.IsRedisError(err) { - shard.Client.connPool.Put(cn) - continue - } - shard.Client.connPool.Remove(cn) + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + shard.Client.releaseConnStrict(cn, err) - if canRetry && internal.IsRetryableError(err, true) { - if failedCmdsMap == nil { - failedCmdsMap = make(map[string][]Cmder) + if canRetry && internal.IsRetryableError(err, true) { + mu.Lock() + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[hash] = cmds + mu.Unlock() } - failedCmdsMap[hash] = cmds - } + }(hash, cmds) } + wg.Wait() if len(failedCmdsMap) == 0 { break } cmdsMap = failedCmdsMap } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { diff --git a/src/dma/vendor/github.com/go-redis/redis/sentinel.go b/src/dma/vendor/github.com/go-redis/redis/sentinel.go index 12c29a71..7cbb90bd 100644 --- a/src/dma/vendor/github.com/go-redis/redis/sentinel.go +++ b/src/dma/vendor/github.com/go-redis/redis/sentinel.go @@ -29,13 +29,17 @@ type FailoverOptions struct { Password string DB int - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, } c.baseClient.init() - c.setProcessor(c.Process) + c.cmdable.setProcessor(c.Process) return &c } @@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { return c } -func (c *SentinelClient) PubSub() *PubSub { +func (c *SentinelClient) pubSub() *PubSub { pubsub := &PubSub{ opt: c.opt, @@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub { return pubsub } +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *SentinelClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *SentinelClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { - cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) + cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) c.Process(cmd) return cmd } func (c *SentinelClient) Sentinels(name string) *SliceCmd { - cmd := NewSliceCmd("SENTINEL", "sentinels", name) + cmd := NewSliceCmd("sentinel", "sentinels", name) + c.Process(cmd) + return cmd +} + +// Failover forces a failover as if the master was not reachable, and without +// asking for agreement to other Sentinels. +func (c *SentinelClient) Failover(name string) *StatusCmd { + cmd := NewStatusCmd("sentinel", "failover", name) + c.Process(cmd) + return cmd +} + +// Reset resets all the masters with matching name. The pattern argument is a +// glob-style pattern. The reset process clears any previous state in a master +// (including a failover in progress), and removes every slave and sentinel +// already discovered and associated with the master. +func (c *SentinelClient) Reset(pattern string) *IntCmd { + cmd := NewIntCmd("sentinel", "reset", pattern) c.Process(cmd) return cmd } @@ -152,79 +194,81 @@ type sentinelFailover struct { masterName string _masterAddr string sentinel *SentinelClient + pubsub *PubSub } -func (d *sentinelFailover) Close() error { - return d.resetSentinel() +func (c *sentinelFailover) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.sentinel != nil { + return c.closeSentinel() + } + return nil } -func (d *sentinelFailover) Pool() *pool.ConnPool { - d.poolOnce.Do(func() { - d.opt.Dialer = d.dial - d.pool = newConnPool(d.opt) +func (c *sentinelFailover) Pool() *pool.ConnPool { + c.poolOnce.Do(func() { + c.opt.Dialer = c.dial + c.pool = newConnPool(c.opt) }) - return d.pool + return c.pool } -func (d *sentinelFailover) dial() (net.Conn, error) { - addr, err := d.MasterAddr() +func (c *sentinelFailover) dial() (net.Conn, error) { + addr, err := c.MasterAddr() if err != nil { return nil, err } - return net.DialTimeout("tcp", addr, d.opt.DialTimeout) + return net.DialTimeout("tcp", addr, c.opt.DialTimeout) } -func (d *sentinelFailover) MasterAddr() (string, error) { - d.mu.Lock() - defer d.mu.Unlock() - - addr, err := d.masterAddr() +func (c *sentinelFailover) MasterAddr() (string, error) { + addr, err := c.masterAddr() if err != nil { return "", err } - d._switchMaster(addr) - + c.switchMaster(addr) return addr, nil } -func (d *sentinelFailover) masterAddr() (string, error) { - // Try last working sentinel. - if d.sentinel != nil { - addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() - if err == nil { - addr := net.JoinHostPort(addr[0], addr[1]) - return addr, nil - } - - internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", - d.masterName, err) - d._resetSentinel() +func (c *sentinelFailover) masterAddr() (string, error) { + addr := c.getMasterAddr() + if addr != "" { + return addr, nil } - for i, sentinelAddr := range d.sentinelAddrs { + c.mu.Lock() + defer c.mu.Unlock() + + for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, - DialTimeout: d.opt.DialTimeout, - ReadTimeout: d.opt.ReadTimeout, - WriteTimeout: d.opt.WriteTimeout, + MaxRetries: c.opt.MaxRetries, + + DialTimeout: c.opt.DialTimeout, + ReadTimeout: c.opt.ReadTimeout, + WriteTimeout: c.opt.WriteTimeout, - PoolSize: d.opt.PoolSize, - PoolTimeout: d.opt.PoolTimeout, - IdleTimeout: d.opt.IdleTimeout, + PoolSize: c.opt.PoolSize, + PoolTimeout: c.opt.PoolTimeout, + IdleTimeout: c.opt.IdleTimeout, + IdleCheckFrequency: c.opt.IdleCheckFrequency, + + TLSConfig: c.opt.TLSConfig, }) - masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() + masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() if err != nil { internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", - d.masterName, err) - sentinel.Close() + c.masterName, err) + _ = sentinel.Close() continue } // Push working sentinel to the top. - d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0] - d.setSentinel(sentinel) + c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] + c.setSentinel(sentinel) addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) return addr, nil @@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) switchMaster(addr string) { - c.mu.Lock() - c._switchMaster(addr) - c.mu.Unlock() +func (c *sentinelFailover) getMasterAddr() string { + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel == nil { + return "" + } + + addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + c.masterName, err) + c.mu.Lock() + if c.sentinel == sentinel { + c.closeSentinel() + } + c.mu.Unlock() + return "" + } + + return net.JoinHostPort(addr[0], addr[1]) } -func (c *sentinelFailover) _switchMaster(addr string) { - if c._masterAddr == addr { +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.RLock() + masterAddr := c._masterAddr + c.mu.RUnlock() + if masterAddr == addr { return } + c.mu.Lock() + defer c.mu.Unlock() + internal.Logf("sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { @@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) { c._masterAddr = addr } -func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { - d.discoverSentinels(sentinel) - d.sentinel = sentinel - go d.listen(sentinel) +func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { + c.discoverSentinels(sentinel) + c.sentinel = sentinel + + c.pubsub = sentinel.Subscribe("+switch-master") + go c.listen(c.pubsub) } -func (d *sentinelFailover) resetSentinel() error { - var err error - d.mu.Lock() - if d.sentinel != nil { - err = d._resetSentinel() +func (c *sentinelFailover) closeSentinel() error { + var firstErr error + + err := c.pubsub.Close() + if err != nil && firstErr == err { + firstErr = err } - d.mu.Unlock() - return err -} + c.pubsub = nil -func (d *sentinelFailover) _resetSentinel() error { - err := d.sentinel.Close() - d.sentinel = nil - return err + err = c.sentinel.Close() + if err != nil && firstErr == err { + firstErr = err + } + c.sentinel = nil + + return firstErr } -func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { - sentinels, err := sentinel.Sentinels(d.masterName).Result() +func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { + sentinels, err := sentinel.Sentinels(c.masterName).Result() if err != nil { - internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err) + internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) return } for _, sentinel := range sentinels { @@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { key := vals[i].(string) if key == "name" { sentinelAddr := vals[i+1].(string) - if !contains(d.sentinelAddrs, sentinelAddr) { - internal.Logf( - "sentinel: discovered new sentinel=%q for master=%q", - sentinelAddr, d.masterName, - ) - d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr) + if !contains(c.sentinelAddrs, sentinelAddr) { + internal.Logf("sentinel: discovered new sentinel=%q for master=%q", + sentinelAddr, c.masterName) + c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) } } } } } -func (d *sentinelFailover) listen(sentinel *SentinelClient) { - pubsub := sentinel.PubSub() - defer pubsub.Close() - - err := pubsub.Subscribe("+switch-master") - if err != nil { - internal.Logf("sentinel: Subscribe failed: %s", err) - d.resetSentinel() - return - } - +func (c *sentinelFailover) listen(pubsub *PubSub) { + ch := pubsub.Channel() for { - msg, err := pubsub.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - d.resetSentinel() - return - } - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - continue + msg, ok := <-ch + if !ok { + break } switch msg.Channel { case "+switch-master": parts := strings.Split(msg.Payload, " ") - if parts[0] != d.masterName { + if parts[0] != c.masterName { internal.Logf("sentinel: ignore addr for master=%q", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - d.switchMaster(addr) + c.switchMaster(addr) } } } diff --git a/src/dma/vendor/github.com/go-redis/redis/tx.go b/src/dma/vendor/github.com/go-redis/redis/tx.go index 6a7da99d..fb3e6331 100644 --- a/src/dma/vendor/github.com/go-redis/redis/tx.go +++ b/src/dma/vendor/github.com/go-redis/redis/tx.go @@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx { return &tx } -// Watch prepares a transcaction and marks the keys to be watched +// Watch prepares a transaction and marks the keys to be watched // for conditional execution if there are any keys. // -// The transaction is automatically closed when the fn exits. +// The transaction is automatically closed when fn exits. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { tx := c.newTx() if len(keys) > 0 { diff --git a/src/dma/vendor/github.com/go-redis/redis/universal.go b/src/dma/vendor/github.com/go-redis/redis/universal.go index 9e30c81d..a6075624 100644 --- a/src/dma/vendor/github.com/go-redis/redis/universal.go +++ b/src/dma/vendor/github.com/go-redis/redis/universal.go @@ -12,35 +12,38 @@ type UniversalOptions struct { // of cluster/sentinel nodes. Addrs []string - // The sentinel master name. - // Only failover clients. - MasterName string - // Database to be selected after connecting to the server. // Only single-node and failover clients. DB int - // Only cluster clients. - - // Enables read only queries on slave nodes. - ReadOnly bool - - MaxRedirects int - RouteByLatency bool - - // Common options + // Common options. OnConnect func(*Conn) error - MaxRetries int Password string + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration TLSConfig *tls.Config + + // Only cluster clients. + + MaxRedirects int + ReadOnly bool + RouteByLatency bool + RouteRandomly bool + + // The sentinel master name. + // Only failover clients. + MasterName string } func (o *UniversalOptions) cluster() *ClusterOptions { @@ -49,22 +52,31 @@ func (o *UniversalOptions) cluster() *ClusterOptions { } return &ClusterOptions{ - Addrs: o.Addrs, + Addrs: o.Addrs, + OnConnect: o.OnConnect, + + Password: o.Password, + MaxRedirects: o.MaxRedirects, - RouteByLatency: o.RouteByLatency, ReadOnly: o.ReadOnly, + RouteByLatency: o.RouteByLatency, + RouteRandomly: o.RouteRandomly, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, DialTimeout: o.DialTimeout, ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } @@ -76,19 +88,27 @@ func (o *UniversalOptions) failover() *FailoverOptions { return &FailoverOptions{ SentinelAddrs: o.Addrs, MasterName: o.MasterName, - DB: o.DB, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } @@ -99,20 +119,28 @@ func (o *UniversalOptions) simple() *Options { } return &Options{ - Addr: addr, - DB: o.DB, + Addr: addr, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } |