aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/internal/pool
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/internal/pool')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go51
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go132
2 files changed, 128 insertions, 55 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
index acaf3665..1095bfe5 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
@@ -13,19 +13,21 @@ var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
- Rd *proto.Reader
- Wb *proto.WriteBuffer
+ rd *proto.Reader
+ rdLocked bool
+ wr *proto.Writer
- Inited bool
- usedAt atomic.Value
+ InitedAt time.Time
+ pooled bool
+ usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
- Wb: proto.NewWriteBuffer(),
}
- cn.Rd = proto.NewReader(cn.netConn)
+ cn.rd = proto.NewReader(netConn)
+ cn.wr = proto.NewWriter(netConn)
cn.SetUsedAt(time.Now())
return cn
}
@@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
- cn.Rd.Reset(netConn)
+ cn.rd.Reset(netConn)
+ cn.wr.Reset(netConn)
}
-func (cn *Conn) IsStale(timeout time.Duration) bool {
- return timeout > 0 && time.Since(cn.UsedAt()) > timeout
-}
-
-func (cn *Conn) SetReadTimeout(timeout time.Duration) {
+func (cn *Conn) setReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetReadDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetReadDeadline(noDeadline)
+ return cn.netConn.SetReadDeadline(now.Add(timeout))
}
+ return cn.netConn.SetReadDeadline(noDeadline)
}
-func (cn *Conn) SetWriteTimeout(timeout time.Duration) {
+func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetWriteDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetWriteDeadline(noDeadline)
+ return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
+ return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Write(b []byte) (int, error) {
@@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
+func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
+ _ = cn.setReadTimeout(timeout)
+ return fn(cn.rd)
+}
+
+func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
+ _ = cn.setWriteTimeout(timeout)
+
+ firstErr := fn(cn.wr)
+ err := cn.wr.Flush()
+ if err != nil && firstErr == nil {
+ firstErr = err
+ }
+ return firstErr
+}
+
func (cn *Conn) Close() error {
return cn.netConn.Close()
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
index cab66904..9cecee8a 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,6 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // deprecated - use IdleConns
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -53,6 +52,8 @@ type Options struct {
OnClose func(*Conn) error
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -63,16 +64,16 @@ type ConnPool struct {
dialErrorsNum uint32 // atomic
- lastDialError error
lastDialErrorMu sync.RWMutex
+ lastDialError error
queue chan struct{}
- connsMu sync.Mutex
- conns []*Conn
-
- idleConnsMu sync.RWMutex
- idleConns []*Conn
+ connsMu sync.Mutex
+ conns []*Conn
+ idleConns []*Conn
+ poolSize int
+ idleConnsLen int
stats Stats
@@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool {
idleConns: make([]*Conn, 0, opt.PoolSize),
}
+ for i := 0; i < opt.MinIdleConns; i++ {
+ p.checkMinIdleConns()
+ }
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
@@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool {
return p
}
+func (p *ConnPool) checkMinIdleConns() {
+ if p.opt.MinIdleConns == 0 {
+ return
+ }
+ if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+ p.poolSize++
+ p.idleConnsLen++
+ go p.addIdleConn()
+ }
+}
+
+func (p *ConnPool) addIdleConn() {
+ cn, err := p.newConn(true)
+ if err != nil {
+ return
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.idleConns = append(p.idleConns, cn)
+ p.connsMu.Unlock()
+}
+
func (p *ConnPool) NewConn() (*Conn, error) {
- cn, err := p.newConn()
+ return p._NewConn(false)
+}
+
+func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
+ cn, err := p.newConn(pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
+ if pooled {
+ if p.poolSize < p.opt.PoolSize {
+ p.poolSize++
+ } else {
+ cn.pooled = false
+ }
+ }
p.connsMu.Unlock()
return cn, nil
}
-func (p *ConnPool) newConn() (*Conn, error) {
+func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) {
return nil, err
}
- return NewConn(netConn), nil
+ cn := NewConn(netConn)
+ cn.pooled = pooled
+ return cn, nil
}
func (p *ConnPool) tryDial() {
@@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) {
}
for {
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.popIdle()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn == nil {
break
}
- if cn.IsStale(p.opt.IdleTimeout) {
- p.CloseConn(cn)
+ if p.isStaleConn(cn) {
+ _ = p.CloseConn(cn)
continue
}
@@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) {
atomic.AddUint32(&p.stats.Misses, 1)
- newcn, err := p.NewConn()
+ newcn, err := p._NewConn(true)
if err != nil {
p.freeTurn()
return nil, err
@@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn {
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
-
+ p.idleConnsLen--
+ p.checkMinIdleConns()
return cn
}
func (p *ConnPool) Put(cn *Conn) {
- buf := cn.Rd.PeekBuffered()
- if buf != nil {
- internal.Logf("connection has unread data: %.100q", buf)
+ if !cn.pooled {
p.Remove(cn)
return
}
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
- p.idleConnsMu.Unlock()
+ p.idleConnsLen++
+ p.connsMu.Unlock()
p.freeTurn()
}
@@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ if cn.pooled {
+ p.poolSize--
+ p.checkMinIdleConns()
+ }
break
}
}
@@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error {
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
- l := len(p.conns)
+ n := len(p.conns)
p.connsMu.Unlock()
- return l
+ return n
}
-// FreeLen returns number of idle connections.
+// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
- p.idleConnsMu.RLock()
- l := len(p.idleConns)
- p.idleConnsMu.RUnlock()
- return l
+ p.connsMu.Lock()
+ n := p.idleConnsLen
+ p.connsMu.Unlock()
+ return n
}
func (p *ConnPool) Stats() *Stats {
@@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats {
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(idleLen),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
@@ -349,11 +393,10 @@ func (p *ConnPool) Close() error {
}
}
p.conns = nil
- p.connsMu.Unlock()
-
- p.idleConnsMu.Lock()
+ p.poolSize = 0
p.idleConns = nil
- p.idleConnsMu.Unlock()
+ p.idleConnsLen = 0
+ p.connsMu.Unlock()
return firstErr
}
@@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn {
}
cn := p.idleConns[0]
- if !cn.IsStale(p.opt.IdleTimeout) {
+ if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+ p.idleConnsLen--
return cn
}
@@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
for {
p.getTurn()
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.reapStaleConn()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn != nil {
p.removeConn(cn)
@@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+ if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+ return false
+ }
+
+ now := time.Now()
+ if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+ return true
+ }
+ if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
+ return true
+ }
+
+ return false
+}