diff options
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/internal/pool')
-rw-r--r-- | src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go | 51 | ||||
-rw-r--r-- | src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go | 132 |
2 files changed, 128 insertions, 55 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go index acaf3665..1095bfe5 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -13,19 +13,21 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn - Rd *proto.Reader - Wb *proto.WriteBuffer + rd *proto.Reader + rdLocked bool + wr *proto.Writer - Inited bool - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, - Wb: proto.NewWriteBuffer(), } - cn.Rd = proto.NewReader(cn.netConn) + cn.rd = proto.NewReader(netConn) + cn.wr = proto.NewWriter(netConn) cn.SetUsedAt(time.Now()) return cn } @@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) { func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn - cn.Rd.Reset(netConn) + cn.rd.Reset(netConn) + cn.wr.Reset(netConn) } -func (cn *Conn) IsStale(timeout time.Duration) bool { - return timeout > 0 && time.Since(cn.UsedAt()) > timeout -} - -func (cn *Conn) SetReadTimeout(timeout time.Duration) { +func (cn *Conn) setReadTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetReadDeadline(now.Add(timeout)) - } else { - cn.netConn.SetReadDeadline(noDeadline) + return cn.netConn.SetReadDeadline(now.Add(timeout)) } + return cn.netConn.SetReadDeadline(noDeadline) } -func (cn *Conn) SetWriteTimeout(timeout time.Duration) { +func (cn *Conn) setWriteTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetWriteDeadline(now.Add(timeout)) - } else { - cn.netConn.SetWriteDeadline(noDeadline) + return cn.netConn.SetWriteDeadline(now.Add(timeout)) } + return cn.netConn.SetWriteDeadline(noDeadline) } func (cn *Conn) Write(b []byte) (int, error) { @@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { + _ = cn.setReadTimeout(timeout) + return fn(cn.rd) +} + +func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { + _ = cn.setWriteTimeout(timeout) + + firstErr := fn(cn.wr) + err := cn.wr.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + func (cn *Conn) Close() error { return cn.netConn.Close() } diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go index cab66904..9cecee8a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,6 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // deprecated - use IdleConns IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -53,6 +52,8 @@ type Options struct { OnClose func(*Conn) error PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -63,16 +64,16 @@ type ConnPool struct { dialErrorsNum uint32 // atomic - lastDialError error lastDialErrorMu sync.RWMutex + lastDialError error queue chan struct{} - connsMu sync.Mutex - conns []*Conn - - idleConnsMu sync.RWMutex - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int stats Stats @@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool { idleConns: make([]*Conn, 0, opt.PoolSize), } + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool { return p } +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + func (p *ConnPool) NewConn() (*Conn, error) { - cn, err := p.newConn() + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) newConn() (*Conn, error) { +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) { return nil, err } - return NewConn(netConn), nil + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil } func (p *ConnPool) tryDial() { @@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) { } for { - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.popIdle() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn == nil { break } - if cn.IsStale(p.opt.IdleTimeout) { - p.CloseConn(cn) + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) continue } @@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.NewConn() + newcn, err := p._NewConn(true) if err != nil { p.freeTurn() return nil, err @@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - + p.idleConnsLen-- + p.checkMinIdleConns() return cn } func (p *ConnPool) Put(cn *Conn) { - buf := cn.Rd.PeekBuffered() - if buf != nil { - internal.Logf("connection has unread data: %.100q", buf) + if !cn.pooled { p.Remove(cn) return } - p.idleConnsMu.Lock() + p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) - p.idleConnsMu.Unlock() + p.idleConnsLen++ + p.connsMu.Unlock() p.freeTurn() } @@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } break } } @@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - l := len(p.conns) + n := len(p.conns) p.connsMu.Unlock() - return l + return n } -// FreeLen returns number of idle connections. +// IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { - p.idleConnsMu.RLock() - l := len(p.idleConns) - p.idleConnsMu.RUnlock() - return l + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n } func (p *ConnPool) Stats() *Stats { @@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats { Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(idleLen), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } @@ -349,11 +393,10 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.connsMu.Unlock() - - p.idleConnsMu.Lock() + p.poolSize = 0 p.idleConns = nil - p.idleConnsMu.Unlock() + p.idleConnsLen = 0 + p.connsMu.Unlock() return firstErr } @@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn { } cn := p.idleConns[0] - if !cn.IsStale(p.opt.IdleTimeout) { + if !p.isStaleConn(cn) { return nil } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- return cn } @@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) { for { p.getTurn() - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.reapStaleConn() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn != nil { p.removeConn(cn) @@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) { atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} |