diff options
author | Tomofumi Hayashi <tohayash@redhat.com> | 2019-04-27 20:38:39 +0900 |
---|---|---|
committer | Tomofumi Hayashi <tohayash@redhat.com> | 2019-04-27 20:40:37 +0900 |
commit | 4d11ca17d0f73f5bd783f45900118295fdfed46b (patch) | |
tree | cce8575b02ac850d2b30ec12a5c4083c48e85c6c /src/dma/vendor/github.com/go-redis/redis/internal | |
parent | 07e4a96e4996f3d39b92dd601b3ed0d23bfbaa0c (diff) |
barometer: update DMA's vendoring packages
Change-Id: I0578b094f1ecdaed20c906be2ba29d51b8089d7c
Signed-off-by: Tomofumi Hayashi <tohayash@redhat.com>
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/internal')
9 files changed, 387 insertions, 338 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go index e0ff8632..34f6bd4d 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/error.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go @@ -8,9 +8,18 @@ import ( "github.com/go-redis/redis/internal/proto" ) -func IsRetryableError(err error, retryNetError bool) bool { - if IsNetworkError(err) { - return retryNetError +func IsRetryableError(err error, retryTimeout bool) bool { + if err == nil { + return false + } + if err == io.EOF { + return true + } + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + return retryTimeout + } + return true } s := err.Error() if s == "ERR max number of clients reached" { @@ -33,20 +42,13 @@ func IsRedisError(err error) bool { return ok } -func IsNetworkError(err error) bool { - if err == io.EOF { - return true - } - _, ok := err.(net.Error) - return ok -} - func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false } if IsRedisError(err) { - return strings.HasPrefix(err.Error(), "READONLY ") + // #790 + return IsReadOnlyError(err) } if allowTimeout { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -81,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } + +func IsReadOnlyError(err error) bool { + return strings.HasPrefix(err.Error(), "READONLY ") +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go index acaf3665..1095bfe5 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -13,19 +13,21 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn - Rd *proto.Reader - Wb *proto.WriteBuffer + rd *proto.Reader + rdLocked bool + wr *proto.Writer - Inited bool - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, - Wb: proto.NewWriteBuffer(), } - cn.Rd = proto.NewReader(cn.netConn) + cn.rd = proto.NewReader(netConn) + cn.wr = proto.NewWriter(netConn) cn.SetUsedAt(time.Now()) return cn } @@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) { func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn - cn.Rd.Reset(netConn) + cn.rd.Reset(netConn) + cn.wr.Reset(netConn) } -func (cn *Conn) IsStale(timeout time.Duration) bool { - return timeout > 0 && time.Since(cn.UsedAt()) > timeout -} - -func (cn *Conn) SetReadTimeout(timeout time.Duration) { +func (cn *Conn) setReadTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetReadDeadline(now.Add(timeout)) - } else { - cn.netConn.SetReadDeadline(noDeadline) + return cn.netConn.SetReadDeadline(now.Add(timeout)) } + return cn.netConn.SetReadDeadline(noDeadline) } -func (cn *Conn) SetWriteTimeout(timeout time.Duration) { +func (cn *Conn) setWriteTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetWriteDeadline(now.Add(timeout)) - } else { - cn.netConn.SetWriteDeadline(noDeadline) + return cn.netConn.SetWriteDeadline(now.Add(timeout)) } + return cn.netConn.SetWriteDeadline(noDeadline) } func (cn *Conn) Write(b []byte) (int, error) { @@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { + _ = cn.setReadTimeout(timeout) + return fn(cn.rd) +} + +func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { + _ = cn.setWriteTimeout(timeout) + + firstErr := fn(cn.wr) + err := cn.wr.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + func (cn *Conn) Close() error { return cn.netConn.Close() } diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go index cab66904..9cecee8a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,6 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // deprecated - use IdleConns IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -53,6 +52,8 @@ type Options struct { OnClose func(*Conn) error PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -63,16 +64,16 @@ type ConnPool struct { dialErrorsNum uint32 // atomic - lastDialError error lastDialErrorMu sync.RWMutex + lastDialError error queue chan struct{} - connsMu sync.Mutex - conns []*Conn - - idleConnsMu sync.RWMutex - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int stats Stats @@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool { idleConns: make([]*Conn, 0, opt.PoolSize), } + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool { return p } +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + func (p *ConnPool) NewConn() (*Conn, error) { - cn, err := p.newConn() + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) newConn() (*Conn, error) { +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) { return nil, err } - return NewConn(netConn), nil + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil } func (p *ConnPool) tryDial() { @@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) { } for { - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.popIdle() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn == nil { break } - if cn.IsStale(p.opt.IdleTimeout) { - p.CloseConn(cn) + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) continue } @@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.NewConn() + newcn, err := p._NewConn(true) if err != nil { p.freeTurn() return nil, err @@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - + p.idleConnsLen-- + p.checkMinIdleConns() return cn } func (p *ConnPool) Put(cn *Conn) { - buf := cn.Rd.PeekBuffered() - if buf != nil { - internal.Logf("connection has unread data: %.100q", buf) + if !cn.pooled { p.Remove(cn) return } - p.idleConnsMu.Lock() + p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) - p.idleConnsMu.Unlock() + p.idleConnsLen++ + p.connsMu.Unlock() p.freeTurn() } @@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } break } } @@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - l := len(p.conns) + n := len(p.conns) p.connsMu.Unlock() - return l + return n } -// FreeLen returns number of idle connections. +// IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { - p.idleConnsMu.RLock() - l := len(p.idleConns) - p.idleConnsMu.RUnlock() - return l + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n } func (p *ConnPool) Stats() *Stats { @@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats { Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(idleLen), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } @@ -349,11 +393,10 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.connsMu.Unlock() - - p.idleConnsMu.Lock() + p.poolSize = 0 p.idleConns = nil - p.idleConnsMu.Unlock() + p.idleConnsLen = 0 + p.connsMu.Unlock() return firstErr } @@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn { } cn := p.idleConns[0] - if !cn.IsStale(p.opt.IdleTimeout) { + if !p.isStaleConn(cn) { return nil } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- return cn } @@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) { for { p.getTurn() - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.reapStaleConn() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn != nil { p.removeConn(cn) @@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) { atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go index 8c28c7b7..896b6f65 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -9,8 +9,6 @@ import ( "github.com/go-redis/redis/internal/util" ) -const bytesAllocLimit = 1024 * 1024 // 1mb - const ( ErrorReply = '-' StatusReply = '+' @@ -32,40 +30,23 @@ func (e RedisError) Error() string { return string(e) } type MultiBulkParse func(*Reader, int64) (interface{}, error) type Reader struct { - src *bufio.Reader - buf []byte + rd *bufio.Reader + _buf []byte } func NewReader(rd io.Reader) *Reader { return &Reader{ - src: bufio.NewReader(rd), - buf: make([]byte, 4096), + rd: bufio.NewReader(rd), + _buf: make([]byte, 64), } } func (r *Reader) Reset(rd io.Reader) { - r.src.Reset(rd) -} - -func (r *Reader) PeekBuffered() []byte { - if n := r.src.Buffered(); n != 0 { - b, _ := r.src.Peek(n) - return b - } - return nil -} - -func (r *Reader) ReadN(n int) ([]byte, error) { - b, err := readN(r.src, r.buf, n) - if err != nil { - return nil, err - } - r.buf = b - return b, nil + r.rd.Reset(rd) } func (r *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := r.src.ReadLine() + line, isPrefix, err := r.rd.ReadLine() if err != nil { return nil, err } @@ -91,11 +72,11 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { case ErrorReply: return nil, ParseErrorReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil case IntReply: return util.ParseInt(line[1:], 10, 64) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case ArrayReply: n, err := parseArrayLen(line) if err != nil { @@ -121,47 +102,42 @@ func (r *Reader) ReadIntReply() (int64, error) { } } -func (r *Reader) ReadTmpBytesReply() ([]byte, error) { +func (r *Reader) ReadString() (string, error) { line, err := r.ReadLine() if err != nil { - return nil, err + return "", err } switch line[0] { case ErrorReply: - return nil, ParseErrorReply(line) + return "", ParseErrorReply(line) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil + case IntReply: + return string(line[1:]), nil default: - return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) + return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line) } } -func (r *Reader) ReadBytesReply() ([]byte, error) { - b, err := r.ReadTmpBytesReply() - if err != nil { - return nil, err +func (r *Reader) readStringReply(line []byte) (string, error) { + if isNilReply(line) { + return "", Nil } - cp := make([]byte, len(b)) - copy(cp, b) - return cp, nil -} -func (r *Reader) ReadStringReply() (string, error) { - b, err := r.ReadTmpBytesReply() + replyLen, err := strconv.Atoi(string(line[1:])) if err != nil { return "", err } - return string(b), nil -} -func (r *Reader) ReadFloatReply() (float64, error) { - b, err := r.ReadTmpBytesReply() + b := make([]byte, replyLen+2) + _, err = io.ReadFull(r.rd, b) if err != nil { - return 0, err + return "", err } - return util.ParseFloat(b, 64) + + return util.BytesToString(b[:replyLen]), nil } func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { @@ -219,7 +195,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { keys := make([]string, n) for i := int64(0); i < n; i++ { - key, err := r.ReadStringReply() + key, err := r.ReadString() if err != nil { return nil, 0, err } @@ -229,69 +205,71 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { return keys, cursor, err } -func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) { - if isNilReply(line) { - return nil, Nil - } - - replyLen, err := strconv.Atoi(string(line[1:])) +func (r *Reader) ReadInt() (int64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } + return util.ParseInt(b, 10, 64) +} - b, err := r.ReadN(replyLen + 2) +func (r *Reader) ReadUint() (uint64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } - return b[:replyLen], nil + return util.ParseUint(b, 10, 64) } -func (r *Reader) ReadInt() (int64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.readTmpBytesReply() if err != nil { return 0, err } - return util.ParseInt(b, 10, 64) + return util.ParseFloat(b, 64) } -func (r *Reader) ReadUint() (uint64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) readTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() if err != nil { - return 0, err + return nil, err + } + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case StringReply: + return r._readTmpBytesReply(line) + case StatusReply: + return line[1:], nil + default: + return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) } - return util.ParseUint(b, 10, 64) } -// -------------------------------------------------------------------- +func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { + if isNilReply(line) { + return nil, Nil + } -func readN(r io.Reader, b []byte, n int) ([]byte, error) { - if n == 0 && b == nil { - return make([]byte, 0), nil + replyLen, err := strconv.Atoi(string(line[1:])) + if err != nil { + return nil, err } - if cap(b) >= n { - b = b[:n] - _, err := io.ReadFull(r, b) - return b, err + buf := r.buf(replyLen + 2) + _, err = io.ReadFull(r.rd, buf) + if err != nil { + return nil, err } - b = b[:cap(b)] - pos := 0 - for pos < n { - diff := n - len(b) - if diff > bytesAllocLimit { - diff = bytesAllocLimit - } - b = append(b, make([]byte, diff)...) + return buf[:replyLen], nil +} - nn, err := io.ReadFull(r, b[pos:]) - if err != nil { - return nil, err - } - pos += nn +func (r *Reader) buf(n int) []byte { + if d := n - cap(r._buf); d > 0 { + r._buf = append(r._buf, make([]byte, d)...) } - - return b, nil + return r._buf[:n] } func isNilReply(b []byte) bool { @@ -304,10 +282,6 @@ func ParseErrorReply(line []byte) error { return RedisError(string(line[1:])) } -func parseTmpStatusReply(line []byte) []byte { - return line[1:] -} - func parseArrayLen(line []byte) (int64, error) { if isNilReply(line) { return 0, Nil diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go deleted file mode 100644 index 664f4c33..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go +++ /dev/null @@ -1,113 +0,0 @@ -package proto - -import ( - "encoding" - "fmt" - "strconv" -) - -type WriteBuffer struct { - b []byte -} - -func NewWriteBuffer() *WriteBuffer { - return &WriteBuffer{ - b: make([]byte, 0, 4096), - } -} - -func (w *WriteBuffer) Len() int { return len(w.b) } -func (w *WriteBuffer) Bytes() []byte { return w.b } -func (w *WriteBuffer) Reset() { w.b = w.b[:0] } - -func (w *WriteBuffer) Append(args []interface{}) error { - w.b = append(w.b, ArrayReply) - w.b = strconv.AppendUint(w.b, uint64(len(args)), 10) - w.b = append(w.b, '\r', '\n') - - for _, arg := range args { - if err := w.append(arg); err != nil { - return err - } - } - return nil -} - -func (w *WriteBuffer) append(val interface{}) error { - switch v := val.(type) { - case nil: - w.AppendString("") - case string: - w.AppendString(v) - case []byte: - w.AppendBytes(v) - case int: - w.AppendString(formatInt(int64(v))) - case int8: - w.AppendString(formatInt(int64(v))) - case int16: - w.AppendString(formatInt(int64(v))) - case int32: - w.AppendString(formatInt(int64(v))) - case int64: - w.AppendString(formatInt(v)) - case uint: - w.AppendString(formatUint(uint64(v))) - case uint8: - w.AppendString(formatUint(uint64(v))) - case uint16: - w.AppendString(formatUint(uint64(v))) - case uint32: - w.AppendString(formatUint(uint64(v))) - case uint64: - w.AppendString(formatUint(v)) - case float32: - w.AppendString(formatFloat(float64(v))) - case float64: - w.AppendString(formatFloat(v)) - case bool: - if v { - w.AppendString("1") - } else { - w.AppendString("0") - } - case encoding.BinaryMarshaler: - b, err := v.MarshalBinary() - if err != nil { - return err - } - w.AppendBytes(b) - default: - return fmt.Errorf( - "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val) - } - return nil -} - -func (w *WriteBuffer) AppendString(s string) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(s)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, s...) - w.b = append(w.b, '\r', '\n') -} - -func (w *WriteBuffer) AppendBytes(p []byte) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(p)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, p...) - w.b = append(w.b, '\r', '\n') -} - -func formatInt(n int64) string { - return strconv.FormatInt(n, 10) -} - -func formatUint(u uint64) string { - return strconv.FormatUint(u, 10) -} - -func formatFloat(f float64) string { - return strconv.FormatFloat(f, 'f', -1, 64) -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go new file mode 100644 index 00000000..d106ce0e --- /dev/null +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go @@ -0,0 +1,159 @@ +package proto + +import ( + "bufio" + "encoding" + "fmt" + "io" + "strconv" + + "github.com/go-redis/redis/internal/util" +) + +type Writer struct { + wr *bufio.Writer + + lenBuf []byte + numBuf []byte +} + +func NewWriter(wr io.Writer) *Writer { + return &Writer{ + wr: bufio.NewWriter(wr), + + lenBuf: make([]byte, 64), + numBuf: make([]byte, 64), + } +} + +func (w *Writer) WriteArgs(args []interface{}) error { + err := w.wr.WriteByte(ArrayReply) + if err != nil { + return err + } + + err = w.writeLen(len(args)) + if err != nil { + return err + } + + for _, arg := range args { + err := w.writeArg(arg) + if err != nil { + return err + } + } + + return nil +} + +func (w *Writer) writeLen(n int) error { + w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) + w.lenBuf = append(w.lenBuf, '\r', '\n') + _, err := w.wr.Write(w.lenBuf) + return err +} + +func (w *Writer) writeArg(v interface{}) error { + switch v := v.(type) { + case nil: + return w.string("") + case string: + return w.string(v) + case []byte: + return w.bytes(v) + case int: + return w.int(int64(v)) + case int8: + return w.int(int64(v)) + case int16: + return w.int(int64(v)) + case int32: + return w.int(int64(v)) + case int64: + return w.int(v) + case uint: + return w.uint(uint64(v)) + case uint8: + return w.uint(uint64(v)) + case uint16: + return w.uint(uint64(v)) + case uint32: + return w.uint(uint64(v)) + case uint64: + return w.uint(v) + case float32: + return w.float(float64(v)) + case float64: + return w.float(v) + case bool: + if v { + return w.int(1) + } else { + return w.int(0) + } + case encoding.BinaryMarshaler: + b, err := v.MarshalBinary() + if err != nil { + return err + } + return w.bytes(b) + default: + return fmt.Errorf( + "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v) + } +} + +func (w *Writer) bytes(b []byte) error { + err := w.wr.WriteByte(StringReply) + if err != nil { + return err + } + + err = w.writeLen(len(b)) + if err != nil { + return err + } + + _, err = w.wr.Write(b) + if err != nil { + return err + } + + return w.crlf() +} + +func (w *Writer) string(s string) error { + return w.bytes(util.StringToBytes(s)) +} + +func (w *Writer) uint(n uint64) error { + w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) int(n int64) error { + w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) float(f float64) error { + w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64) + return w.bytes(w.numBuf) +} + +func (w *Writer) crlf() error { + err := w.wr.WriteByte('\r') + if err != nil { + return err + } + return w.wr.WriteByte('\n') +} + +func (w *Writer) Reset(wr io.Writer) { + w.wr.Reset(wr) +} + +func (w *Writer) Flush() error { + return w.wr.Flush() +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go deleted file mode 100644 index 3b174172..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight - -import "sync" - -// call is an in-flight or completed Do call -type call struct { - wg sync.WaitGroup - val interface{} - err error -} - -// Group represents a class of work and forms a namespace in which -// units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - g.mu.Unlock() - c.wg.Wait() - return c.val, c.err - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() - - return c.val, c.err -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go index cd891833..1b3060eb 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go @@ -5,3 +5,7 @@ package util func BytesToString(b []byte) string { return string(b) } + +func StringToBytes(s string) []byte { + return []byte(s) +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go index 93a89c55..c9868aac 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go @@ -10,3 +10,13 @@ import ( func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } + +// StringToBytes converts string to byte slice. +func StringToBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +} |