aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/cluster.go499
1 files changed, 248 insertions, 251 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/cluster.go b/src/dma/vendor/github.com/go-redis/redis/cluster.go
index 7a1af143..0cecc62c 100644
--- a/src/dma/vendor/github.com/go-redis/redis/cluster.go
+++ b/src/dma/vendor/github.com/go-redis/redis/cluster.go
@@ -3,11 +3,11 @@ package redis
import (
"context"
"crypto/tls"
- "errors"
"fmt"
"math"
"math/rand"
"net"
+ "runtime"
"sort"
"sync"
"sync/atomic"
@@ -17,7 +17,6 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@@ -49,14 +48,18 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
+ // Optional hook that is called when a new node is created.
+ OnNewNode func(*Client)
+
// Following options are copied from Options struct.
OnConnect func(*Conn) error
+ Password string
+
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
- Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -64,6 +67,8 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
- if opt.RouteByLatency || opt.RouteRandomly {
+ if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
opt.ReadOnly = true
}
+ if opt.PoolSize == 0 {
+ opt.PoolSize = 5 * runtime.NumCPU()
+ }
+
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
@@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
- PoolSize: opt.PoolSize,
- PoolTimeout: opt.PoolTimeout,
- IdleTimeout: opt.IdleTimeout,
-
+ PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
+ PoolTimeout: opt.PoolTimeout,
+ IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
TLSConfig: opt.TLSConfig,
@@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
+ if clOpt.OnNewNode != nil {
+ clOpt.OnNewNode(node.Client)
+ }
+
return &node
}
@@ -228,8 +242,6 @@ type clusterNodes struct {
clusterAddrs []string
closed bool
- nodeCreateGroup singleflight.Group
-
_generation uint32 // atomic
}
@@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
- v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
- node := newClusterNode(c.opt, addr)
- return node, nil
- })
-
c.mu.Lock()
defer c.mu.Unlock()
@@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr]
if ok {
- _ = v.(*clusterNode).Close()
return node, err
}
- node = v.(*clusterNode)
+
+ node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
- if err == nil {
- c.clusterAddrs = append(c.clusterAddrs, addr)
- }
+ c.clusterAddrs = append(c.clusterAddrs, addr)
c.allNodes[addr] = node
return node, err
@@ -429,13 +434,15 @@ func newClusterState(
createdAt: time.Now(),
}
- isLoopbackOrigin := isLoopbackAddr(origin)
+ originHost, _, _ := net.SplitHostPort(origin)
+ isLoopbackOrigin := isLoopback(originHost)
+
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
- if !isLoopbackOrigin && useOriginAddr(origin, addr) {
- addr = origin
+ if !isLoopbackOrigin {
+ addr = replaceLoopbackHost(addr, originHost)
}
node, err := c.nodes.GetOrCreate(addr)
@@ -469,6 +476,33 @@ func newClusterState(
return &c, nil
}
+func replaceLoopbackHost(nodeAddr, originHost string) string {
+ nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+ if err != nil {
+ return nodeAddr
+ }
+
+ nodeIP := net.ParseIP(nodeHost)
+ if nodeIP == nil {
+ return nodeAddr
+ }
+
+ if !nodeIP.IsLoopback() {
+ return nodeAddr
+ }
+
+ // Use origin host which is not loopback and node port.
+ return net.JoinHostPort(originHost, nodePort)
+}
+
+func isLoopback(host string) bool {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return true
+ }
+ return ip.IsLoopback()
+}
+
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
@@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
- break
+ return slave, nil
}
}
- return slave, nil
+
+ // All slaves are loading - use master.
+ return nodes[0], nil
}
}
@@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
-func (c *clusterState) IsConsistent() bool {
- if c.nodes.opt.ClusterSlots != nil {
- return true
- }
- return len(c.Masters) <= len(c.Slaves)
-}
-
//------------------------------------------------------------------------------
type clusterStateHolder struct {
load func() (*clusterState, error)
- state atomic.Value
-
- firstErrMu sync.RWMutex
- firstErr error
-
+ state atomic.Value
reloading uint32 // atomic
}
@@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
func (c *clusterStateHolder) Reload() (*clusterState, error) {
- state, err := c.reload()
- if err != nil {
- return nil, err
- }
- if !state.IsConsistent() {
- time.AfterFunc(time.Second, c.LazyReload)
- }
- return state, nil
-}
-
-func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
- c.firstErrMu.Lock()
- if c.firstErr == nil {
- c.firstErr = err
- }
- c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- for {
- state, err := c.reload()
- if err != nil {
- return
- }
- time.Sleep(100 * time.Millisecond)
- if state.IsConsistent() {
- return
- }
+ _, err := c.Reload()
+ if err != nil {
+ return
}
+ time.Sleep(100 * time.Millisecond)
}()
}
@@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
}
return state, nil
}
-
- c.firstErrMu.RLock()
- err := c.firstErr
- c.firstErrMu.RUnlock()
- if err != nil {
- return nil, err
- }
-
- return nil, errors.New("redis: cluster has no state")
+ return c.Reload()
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline
c.init()
-
- _, _ = c.state.Reload()
- _, _ = c.cmdsInfoCache.Get()
-
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
-// ReloadState reloads cluster state. It calls ClusterSlots func
+func (c *ClusterClient) init() {
+ c.cmdable.setProcessor(c.Process)
+}
+
+// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
-func (c *ClusterClient) init() {
- c.cmdable.setProcessor(c.Process)
-}
-
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int {
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+ args := cmd.Args()
+ if args[0] == "cluster" && args[1] == "getkeysinslot" {
+ return args[2].(int)
+ }
+
cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
@@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
}
cmdInfo := c.cmdInfo(cmd.Name())
- slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
+ slot := c.cmdSlot(cmd)
- if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
+ if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
@@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil {
break
}
-
- if internal.IsRetryableError(err, true) {
+ if err != Nil {
c.state.LazyReload()
- continue
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node, err = c.slotMasterNode(slot)
if err != nil {
return err
@@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
+ if internal.IsRetryableError(err, true) {
+ continue
+ }
+
return err
}
@@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *ClusterClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
@@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil {
break
}
+ if err != Nil {
+ c.state.LazyReload()
+ }
- // If slave is loading - read from master.
+ // If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
- continue
- }
-
- if internal.IsRetryableError(err, true) {
- c.state.LazyReload()
-
- // First retry the same node.
- if attempt == 0 {
- continue
- }
-
- // Second try random node.
- node, err = c.nodes.Random()
- if err != nil {
- break
- }
+ node = nil
continue
}
@@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
-
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
@@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue
}
+ if internal.IsRetryableError(err, true) {
+ // First retry the same node.
+ if attempt == 0 {
+ continue
+ }
+
+ // Second try random node.
+ node, err = c.nodes.Random()
+ if err != nil {
+ break
+ }
+ continue
+ }
+
break
}
@@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline(
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
- cmdsMap, err := c.mapCmdsByNode(cmds)
+ cmdsMap := newCmdsMap()
+ err := c.mapCmdsByNode(cmds, cmdsMap)
if err != nil {
setCmdsErr(cmds, err)
return err
@@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
- for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ for node, cmds := range cmdsMap.m {
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
-func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
+type cmdsMap struct {
+ mu sync.Mutex
+ m map[*clusterNode][]Cmder
+}
+
+func newCmdsMap() *cmdsMap {
+ return &cmdsMap{
+ m: make(map[*clusterNode][]Cmder),
+ }
+}
+
+func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
- return nil, err
+ return err
}
- cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
var node *clusterNode
@@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
node, err = state.slotMasterNode(slot)
}
if err != nil {
- return nil, err
+ return err
}
- cmdsMap[node] = append(cmdsMap[node], cmd)
+ cmdsMap.mu.Lock()
+ cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
+ cmdsMap.mu.Unlock()
}
- return cmdsMap, nil
+ return nil
}
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true
}
-func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
- remappedCmds, err := c.mapCmdsByNode(cmds)
- if err != nil {
- setCmdsErr(cmds, err)
- return
- }
-
- for node, cmds := range remappedCmds {
- failedCmds[node] = cmds
- }
-}
-
func (c *ClusterClient) pipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
-
- err := writeCmd(cn, cmds...)
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
if err != nil {
setCmdsErr(cmds, err)
- failedCmds[node] = cmds
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- return c.pipelineReadCmds(cn, cmds, failedCmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return c.pipelineReadCmds(node, rd, cmds, failedCmds)
+ })
+ return err
}
func (c *ClusterClient) pipelineReadCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
+ var firstErr error
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err == nil {
continue
}
@@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}
- return err
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
+ if firstErr == nil {
+ firstErr = err
+ }
}
- return nil
+ return firstErr
}
func (c *ClusterClient) checkMovedErr(
- cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+ cmd Cmder, err error, failedCmds *cmdsMap,
) bool {
moved, ask, addr := internal.IsMovedError(err)
@@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
- cmdsMap = failedCmds
+ cmdsMap = failedCmds.m
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
@@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
func (c *ClusterClient) txPipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- failedCmds[node] = cmds
- return err
- }
-
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- return pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := c.txPipelineReadQueued(rd, cmds, failedCmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return err
}
func (c *ClusterClient) txPipelineReadQueued(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
- err := statusCmd.readReply(cn)
+ err := statusCmd.readReply(rd)
if err == nil {
continue
}
@@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued(
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}
-func (c *ClusterClient) pubSub(channels []string) *PubSub {
+func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
- if node == nil {
- var slot int
- if len(channels) > 0 {
- slot = hashtag.Slot(channels[0])
- } else {
- slot = -1
- }
+ if node != nil {
+ panic("node != nil")
+ }
- masterNode, err := c.slotMasterNode(slot)
- if err != nil {
- return nil, err
- }
- node = masterNode
+ slot := hashtag.Slot(channels[0])
+
+ var err error
+ node, err = c.slotMasterNode(slot)
+ if err != nil {
+ return nil, err
}
- return node.Client.newConn()
+
+ cn, err := node.Client.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ return cn, nil
},
closeConn: func(cn *pool.Conn) error {
- return node.Client.connPool.CloseConn(cn)
+ err := node.Client.connPool.CloseConn(cn)
+ node = nil
+ return err
},
}
pubsub.init()
+
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
@@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
-func useOriginAddr(originAddr, nodeAddr string) bool {
- nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
- if err != nil {
- return false
- }
-
- nodeIP := net.ParseIP(nodeHost)
- if nodeIP == nil {
- return false
- }
-
- if !nodeIP.IsLoopback() {
- return false
- }
-
- _, originPort, err := net.SplitHostPort(originAddr)
- if err != nil {
- return false
- }
-
- return nodePort == originPort
-}
-
-func isLoopbackAddr(addr string) bool {
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return false
- }
-
- ip := net.ParseIP(host)
- if ip == nil {
- return false
- }
-
- return ip.IsLoopback()
-}
-
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {