aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/sentinel.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/sentinel.go')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/sentinel.go252
1 files changed, 154 insertions, 98 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/sentinel.go b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
index 12c29a71..7cbb90bd 100644
--- a/src/dma/vendor/github.com/go-redis/redis/sentinel.go
+++ b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
@@ -29,13 +29,17 @@ type FailoverOptions struct {
Password string
DB int
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
}
c.baseClient.init()
- c.setProcessor(c.Process)
+ c.cmdable.setProcessor(c.Process)
return &c
}
@@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c
}
-func (c *SentinelClient) PubSub() *PubSub {
+func (c *SentinelClient) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
@@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub {
return pubsub
}
+// Subscribe subscribes the client to the specified channels.
+// Channels can be omitted to create empty subscription.
+func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.Subscribe(channels...)
+ }
+ return pubsub
+}
+
+// PSubscribe subscribes the client to the given patterns.
+// Patterns can be omitted to create empty subscription.
+func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.PSubscribe(channels...)
+ }
+ return pubsub
+}
+
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
- cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
+ cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
func (c *SentinelClient) Sentinels(name string) *SliceCmd {
- cmd := NewSliceCmd("SENTINEL", "sentinels", name)
+ cmd := NewSliceCmd("sentinel", "sentinels", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Failover forces a failover as if the master was not reachable, and without
+// asking for agreement to other Sentinels.
+func (c *SentinelClient) Failover(name string) *StatusCmd {
+ cmd := NewStatusCmd("sentinel", "failover", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Reset resets all the masters with matching name. The pattern argument is a
+// glob-style pattern. The reset process clears any previous state in a master
+// (including a failover in progress), and removes every slave and sentinel
+// already discovered and associated with the master.
+func (c *SentinelClient) Reset(pattern string) *IntCmd {
+ cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
@@ -152,79 +194,81 @@ type sentinelFailover struct {
masterName string
_masterAddr string
sentinel *SentinelClient
+ pubsub *PubSub
}
-func (d *sentinelFailover) Close() error {
- return d.resetSentinel()
+func (c *sentinelFailover) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.sentinel != nil {
+ return c.closeSentinel()
+ }
+ return nil
}
-func (d *sentinelFailover) Pool() *pool.ConnPool {
- d.poolOnce.Do(func() {
- d.opt.Dialer = d.dial
- d.pool = newConnPool(d.opt)
+func (c *sentinelFailover) Pool() *pool.ConnPool {
+ c.poolOnce.Do(func() {
+ c.opt.Dialer = c.dial
+ c.pool = newConnPool(c.opt)
})
- return d.pool
+ return c.pool
}
-func (d *sentinelFailover) dial() (net.Conn, error) {
- addr, err := d.MasterAddr()
+func (c *sentinelFailover) dial() (net.Conn, error) {
+ addr, err := c.MasterAddr()
if err != nil {
return nil, err
}
- return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
+ return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
}
-func (d *sentinelFailover) MasterAddr() (string, error) {
- d.mu.Lock()
- defer d.mu.Unlock()
-
- addr, err := d.masterAddr()
+func (c *sentinelFailover) MasterAddr() (string, error) {
+ addr, err := c.masterAddr()
if err != nil {
return "", err
}
- d._switchMaster(addr)
-
+ c.switchMaster(addr)
return addr, nil
}
-func (d *sentinelFailover) masterAddr() (string, error) {
- // Try last working sentinel.
- if d.sentinel != nil {
- addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
- if err == nil {
- addr := net.JoinHostPort(addr[0], addr[1])
- return addr, nil
- }
-
- internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
- d.masterName, err)
- d._resetSentinel()
+func (c *sentinelFailover) masterAddr() (string, error) {
+ addr := c.getMasterAddr()
+ if addr != "" {
+ return addr, nil
}
- for i, sentinelAddr := range d.sentinelAddrs {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
- DialTimeout: d.opt.DialTimeout,
- ReadTimeout: d.opt.ReadTimeout,
- WriteTimeout: d.opt.WriteTimeout,
+ MaxRetries: c.opt.MaxRetries,
+
+ DialTimeout: c.opt.DialTimeout,
+ ReadTimeout: c.opt.ReadTimeout,
+ WriteTimeout: c.opt.WriteTimeout,
- PoolSize: d.opt.PoolSize,
- PoolTimeout: d.opt.PoolTimeout,
- IdleTimeout: d.opt.IdleTimeout,
+ PoolSize: c.opt.PoolSize,
+ PoolTimeout: c.opt.PoolTimeout,
+ IdleTimeout: c.opt.IdleTimeout,
+ IdleCheckFrequency: c.opt.IdleCheckFrequency,
+
+ TLSConfig: c.opt.TLSConfig,
})
- masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
+ masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
- d.masterName, err)
- sentinel.Close()
+ c.masterName, err)
+ _ = sentinel.Close()
continue
}
// Push working sentinel to the top.
- d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
- d.setSentinel(sentinel)
+ c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
+ c.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
@@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}
-func (c *sentinelFailover) switchMaster(addr string) {
- c.mu.Lock()
- c._switchMaster(addr)
- c.mu.Unlock()
+func (c *sentinelFailover) getMasterAddr() string {
+ c.mu.RLock()
+ sentinel := c.sentinel
+ c.mu.RUnlock()
+
+ if sentinel == nil {
+ return ""
+ }
+
+ addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
+ if err != nil {
+ internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
+ c.masterName, err)
+ c.mu.Lock()
+ if c.sentinel == sentinel {
+ c.closeSentinel()
+ }
+ c.mu.Unlock()
+ return ""
+ }
+
+ return net.JoinHostPort(addr[0], addr[1])
}
-func (c *sentinelFailover) _switchMaster(addr string) {
- if c._masterAddr == addr {
+func (c *sentinelFailover) switchMaster(addr string) {
+ c.mu.RLock()
+ masterAddr := c._masterAddr
+ c.mu.RUnlock()
+ if masterAddr == addr {
return
}
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
internal.Logf("sentinel: new master=%q addr=%q",
c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
@@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) {
c._masterAddr = addr
}
-func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
- d.discoverSentinels(sentinel)
- d.sentinel = sentinel
- go d.listen(sentinel)
+func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
+ c.discoverSentinels(sentinel)
+ c.sentinel = sentinel
+
+ c.pubsub = sentinel.Subscribe("+switch-master")
+ go c.listen(c.pubsub)
}
-func (d *sentinelFailover) resetSentinel() error {
- var err error
- d.mu.Lock()
- if d.sentinel != nil {
- err = d._resetSentinel()
+func (c *sentinelFailover) closeSentinel() error {
+ var firstErr error
+
+ err := c.pubsub.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
}
- d.mu.Unlock()
- return err
-}
+ c.pubsub = nil
-func (d *sentinelFailover) _resetSentinel() error {
- err := d.sentinel.Close()
- d.sentinel = nil
- return err
+ err = c.sentinel.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
+ }
+ c.sentinel = nil
+
+ return firstErr
}
-func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
- sentinels, err := sentinel.Sentinels(d.masterName).Result()
+func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
+ sentinels, err := sentinel.Sentinels(c.masterName).Result()
if err != nil {
- internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
+ internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
return
}
for _, sentinel := range sentinels {
@@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
key := vals[i].(string)
if key == "name" {
sentinelAddr := vals[i+1].(string)
- if !contains(d.sentinelAddrs, sentinelAddr) {
- internal.Logf(
- "sentinel: discovered new sentinel=%q for master=%q",
- sentinelAddr, d.masterName,
- )
- d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
+ if !contains(c.sentinelAddrs, sentinelAddr) {
+ internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
+ sentinelAddr, c.masterName)
+ c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
}
}
}
}
}
-func (d *sentinelFailover) listen(sentinel *SentinelClient) {
- pubsub := sentinel.PubSub()
- defer pubsub.Close()
-
- err := pubsub.Subscribe("+switch-master")
- if err != nil {
- internal.Logf("sentinel: Subscribe failed: %s", err)
- d.resetSentinel()
- return
- }
-
+func (c *sentinelFailover) listen(pubsub *PubSub) {
+ ch := pubsub.Channel()
for {
- msg, err := pubsub.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- d.resetSentinel()
- return
- }
- internal.Logf("sentinel: ReceiveMessage failed: %s", err)
- continue
+ msg, ok := <-ch
+ if !ok {
+ break
}
switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
- if parts[0] != d.masterName {
+ if parts[0] != c.masterName {
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
- d.switchMaster(addr)
+ c.switchMaster(addr)
}
}
}