diff options
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/cluster.go')
-rw-r--r-- | src/dma/vendor/github.com/go-redis/redis/cluster.go | 499 |
1 files changed, 248 insertions, 251 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/cluster.go b/src/dma/vendor/github.com/go-redis/redis/cluster.go index 7a1af143..0cecc62c 100644 --- a/src/dma/vendor/github.com/go-redis/redis/cluster.go +++ b/src/dma/vendor/github.com/go-redis/redis/cluster.go @@ -3,11 +3,11 @@ package redis import ( "context" "crypto/tls" - "errors" "fmt" "math" "math/rand" "net" + "runtime" "sort" "sync" "sync/atomic" @@ -17,7 +17,6 @@ import ( "github.com/go-redis/redis/internal/hashtag" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/singleflight" ) var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") @@ -49,14 +48,18 @@ type ClusterOptions struct { // and Cluster.ReloadState to manually trigger state reloading. ClusterSlots func() ([]ClusterSlot, error) + // Optional hook that is called when a new node is created. + OnNewNode func(*Client) + // Following options are copied from Options struct. OnConnect func(*Conn) error + Password string + MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration - Password string DialTimeout time.Duration ReadTimeout time.Duration @@ -64,6 +67,8 @@ type ClusterOptions struct { // PoolSize applies per cluster node and not for the whole cluster. PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() { opt.MaxRedirects = 8 } - if opt.RouteByLatency || opt.RouteRandomly { + if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil { opt.ReadOnly = true } + if opt.PoolSize == 0 { + opt.PoolSize = 5 * runtime.NumCPU() + } + switch opt.ReadTimeout { case -1: opt.ReadTimeout = 0 @@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options { ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, - PoolSize: opt.PoolSize, - PoolTimeout: opt.PoolTimeout, - IdleTimeout: opt.IdleTimeout, - + PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, + PoolTimeout: opt.PoolTimeout, + IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: disableIdleCheck, TLSConfig: opt.TLSConfig, @@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { go node.updateLatency() } + if clOpt.OnNewNode != nil { + clOpt.OnNewNode(node.Client) + } + return &node } @@ -228,8 +242,6 @@ type clusterNodes struct { clusterAddrs []string closed bool - nodeCreateGroup singleflight.Group - _generation uint32 // atomic } @@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { return node, nil } - v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) { - node := newClusterNode(c.opt, addr) - return node, nil - }) - c.mu.Lock() defer c.mu.Unlock() @@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { node, ok := c.allNodes[addr] if ok { - _ = v.(*clusterNode).Close() return node, err } - node = v.(*clusterNode) + + node = newClusterNode(c.opt, addr) c.allAddrs = appendIfNotExists(c.allAddrs, addr) - if err == nil { - c.clusterAddrs = append(c.clusterAddrs, addr) - } + c.clusterAddrs = append(c.clusterAddrs, addr) c.allNodes[addr] = node return node, err @@ -429,13 +434,15 @@ func newClusterState( createdAt: time.Now(), } - isLoopbackOrigin := isLoopbackAddr(origin) + originHost, _, _ := net.SplitHostPort(origin) + isLoopbackOrigin := isLoopback(originHost) + for _, slot := range slots { var nodes []*clusterNode for i, slotNode := range slot.Nodes { addr := slotNode.Addr - if !isLoopbackOrigin && useOriginAddr(origin, addr) { - addr = origin + if !isLoopbackOrigin { + addr = replaceLoopbackHost(addr, originHost) } node, err := c.nodes.GetOrCreate(addr) @@ -469,6 +476,33 @@ func newClusterState( return &c, nil } +func replaceLoopbackHost(nodeAddr, originHost string) string { + nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) + if err != nil { + return nodeAddr + } + + nodeIP := net.ParseIP(nodeHost) + if nodeIP == nil { + return nodeAddr + } + + if !nodeIP.IsLoopback() { + return nodeAddr + } + + // Use origin host which is not loopback and node port. + return net.JoinHostPort(originHost, nodePort) +} + +func isLoopback(host string) bool { + ip := net.ParseIP(host) + if ip == nil { + return true + } + return ip.IsLoopback() +} + func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) { nodes := c.slotNodes(slot) if len(nodes) > 0 { @@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { n := rand.Intn(len(nodes)-1) + 1 slave = nodes[n] if !slave.Loading() { - break + return slave, nil } } - return slave, nil + + // All slaves are loading - use master. + return nodes[0], nil } } @@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { return nil } -func (c *clusterState) IsConsistent() bool { - if c.nodes.opt.ClusterSlots != nil { - return true - } - return len(c.Masters) <= len(c.Slaves) -} - //------------------------------------------------------------------------------ type clusterStateHolder struct { load func() (*clusterState, error) - state atomic.Value - - firstErrMu sync.RWMutex - firstErr error - + state atomic.Value reloading uint32 // atomic } @@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder } func (c *clusterStateHolder) Reload() (*clusterState, error) { - state, err := c.reload() - if err != nil { - return nil, err - } - if !state.IsConsistent() { - time.AfterFunc(time.Second, c.LazyReload) - } - return state, nil -} - -func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.firstErrMu.Lock() - if c.firstErr == nil { - c.firstErr = err - } - c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - for { - state, err := c.reload() - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - if state.IsConsistent() { - return - } + _, err := c.Reload() + if err != nil { + return } + time.Sleep(100 * time.Millisecond) }() } @@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { } return state, nil } - - c.firstErrMu.RLock() - err := c.firstErr - c.firstErrMu.RUnlock() - if err != nil { - return nil, err - } - - return nil, errors.New("redis: cluster has no state") + return c.Reload() } func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { @@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.processTxPipeline = c.defaultProcessTxPipeline c.init() - - _, _ = c.state.Reload() - _, _ = c.cmdsInfoCache.Get() - if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } -// ReloadState reloads cluster state. It calls ClusterSlots func +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + +// ReloadState reloads cluster state. If available it calls ClusterSlots func // to get cluster slots information. func (c *ClusterClient) ReloadState() error { _, err := c.state.Reload() return err } -func (c *ClusterClient) init() { - c.cmdable.setProcessor(c.Process) -} - func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx @@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int { } func (c *ClusterClient) cmdSlot(cmd Cmder) int { + args := cmd.Args() + if args[0] == "cluster" && args[1] == "getkeysinslot" { + return args[2].(int) + } + cmdInfo := c.cmdInfo(cmd.Name()) return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } @@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { } cmdInfo := c.cmdInfo(cmd.Name()) - slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) + slot := c.cmdSlot(cmd) - if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { + if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if c.opt.RouteByLatency { node, err := state.slotClosestNode(slot) return slot, node, err @@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if err == nil { break } - - if internal.IsRetryableError(err, true) { + if err != Nil { c.state.LazyReload() - continue } moved, ask, addr := internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() node, err = c.nodes.GetOrCreate(addr) if err != nil { return err @@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node, err = c.slotMasterNode(slot) if err != nil { return err @@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } + if internal.IsRetryableError(err, true) { + continue + } + return err } @@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + func (c *ClusterClient) WrapProcess( fn func(oldProcess func(Cmder) error) func(Cmder) error, ) { @@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { if err == nil { break } + if err != Nil { + c.state.LazyReload() + } - // If slave is loading - read from master. + // If slave is loading - pick another node. if c.opt.ReadOnly && internal.IsLoadingError(err) { node.MarkAsLoading() - continue - } - - if internal.IsRetryableError(err, true) { - c.state.LazyReload() - - // First retry the same node. - if attempt == 0 { - continue - } - - // Second try random node. - node, err = c.nodes.Random() - if err != nil { - break - } + node = nil continue } @@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() - node, err = c.nodes.GetOrCreate(addr) if err != nil { break @@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node = nil continue } + if internal.IsRetryableError(err, true) { + // First retry the same node. + if attempt == 0 { + continue + } + + // Second try random node. + node, err = c.nodes.Random() + if err != nil { + break + } + continue + } + break } @@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline( } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsByNode(cmds) + cmdsMap := newCmdsMap() + err := c.mapCmdsByNode(cmds, cmdsMap) if err != nil { setCmdsErr(cmds, err) return err @@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup - for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + for node, cmds := range cmdsMap.m { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } cmdsMap = failedCmds } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { state, err := c.state.Get() if err != nil { setCmdsErr(cmds, err) - return nil, err + return err } - cmdsMap := make(map[*clusterNode][]Cmder) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { var node *clusterNode @@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e node, err = state.slotMasterNode(slot) } if err != nil { - return nil, err + return err } - cmdsMap[node] = append(cmdsMap[node], cmd) + cmdsMap.mu.Lock() + cmdsMap.m[node] = append(cmdsMap.m[node], cmd) + cmdsMap.mu.Unlock() } - return cmdsMap, nil + return nil } func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { @@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { return true } -func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { - remappedCmds, err := c.mapCmdsByNode(cmds) - if err != nil { - setCmdsErr(cmds, err) - return - } - - for node, cmds := range remappedCmds { - failedCmds[node] = cmds - } -} - func (c *ClusterClient) pipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - - err := writeCmd(cn, cmds...) + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) if err != nil { setCmdsErr(cmds, err) - failedCmds[node] = cmds + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - return c.pipelineReadCmds(cn, cmds, failedCmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return c.pipelineReadCmds(node, rd, cmds, failedCmds) + }) + return err } func (c *ClusterClient) pipelineReadCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { + var firstErr error for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err == nil { continue } @@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds( continue } - return err + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + if firstErr == nil { + firstErr = err + } } - return nil + return firstErr } func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, + cmd Cmder, err error, failedCmds *cmdsMap, ) bool { moved, ask, addr := internal.IsMovedError(err) @@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() return true } @@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Unlock() return true } @@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } - cmdsMap = failedCmds + cmdsMap = failedCmds.m } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { @@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } func (c *ClusterClient) txPipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { - setCmdsErr(cmds, err) - failedCmds[node] = cmds - return err - } - - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - return pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := c.txPipelineReadQueued(rd, cmds, failedCmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return err } func (c *ClusterClient) txPipelineReadQueued( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { - err := statusCmd.readReply(cn) + err := statusCmd.readReply(rd) if err == nil { continue } @@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued( } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued( return nil } -func (c *ClusterClient) pubSub(channels []string) *PubSub { +func (c *ClusterClient) pubSub() *PubSub { var node *clusterNode pubsub := &PubSub{ opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { - if node == nil { - var slot int - if len(channels) > 0 { - slot = hashtag.Slot(channels[0]) - } else { - slot = -1 - } + if node != nil { + panic("node != nil") + } - masterNode, err := c.slotMasterNode(slot) - if err != nil { - return nil, err - } - node = masterNode + slot := hashtag.Slot(channels[0]) + + var err error + node, err = c.slotMasterNode(slot) + if err != nil { + return nil, err } - return node.Client.newConn() + + cn, err := node.Client.newConn() + if err != nil { + return nil, err + } + + return cn, nil }, closeConn: func(cn *pool.Conn) error { - return node.Client.connPool.CloseConn(cn) + err := node.Client.connPool.CloseConn(cn) + node = nil + return err }, } pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.Subscribe(channels...) } @@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub { // PSubscribe subscribes the client to the given patterns. // Patterns can be omitted to create empty subscription. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.PSubscribe(channels...) } return pubsub } -func useOriginAddr(originAddr, nodeAddr string) bool { - nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) - if err != nil { - return false - } - - nodeIP := net.ParseIP(nodeHost) - if nodeIP == nil { - return false - } - - if !nodeIP.IsLoopback() { - return false - } - - _, originPort, err := net.SplitHostPort(originAddr) - if err != nil { - return false - } - - return nodePort == originPort -} - -func isLoopbackAddr(addr string) bool { - host, _, err := net.SplitHostPort(addr) - if err != nil { - return false - } - - ip := net.ParseIP(host) - if ip == nil { - return false - } - - return ip.IsLoopback() -} - func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { |