summaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/internal
diff options
context:
space:
mode:
authorToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-06 09:04:29 +0000
committerToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-07 06:03:01 +0000
commitd61931341176dad9ccff7c967a10d88fe54218fa (patch)
tree526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/vendor/github.com/go-redis/redis/internal
parent73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff)
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/internal')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go81
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/error.go83
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go77
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/internal.go24
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/log.go15
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/once.go60
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go80
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go416
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_single.go53
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go109
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go316
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/scan.go166
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go113
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go64
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util.go29
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go7
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/strconv.go19
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go12
18 files changed, 1724 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go b/src/dma/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go
new file mode 100644
index 00000000..a9c56f07
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/consistenthash/consistenthash.go
@@ -0,0 +1,81 @@
+/*
+Copyright 2013 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package consistenthash provides an implementation of a ring hash.
+package consistenthash
+
+import (
+ "hash/crc32"
+ "sort"
+ "strconv"
+)
+
+type Hash func(data []byte) uint32
+
+type Map struct {
+ hash Hash
+ replicas int
+ keys []int // Sorted
+ hashMap map[int]string
+}
+
+func New(replicas int, fn Hash) *Map {
+ m := &Map{
+ replicas: replicas,
+ hash: fn,
+ hashMap: make(map[int]string),
+ }
+ if m.hash == nil {
+ m.hash = crc32.ChecksumIEEE
+ }
+ return m
+}
+
+// Returns true if there are no items available.
+func (m *Map) IsEmpty() bool {
+ return len(m.keys) == 0
+}
+
+// Adds some keys to the hash.
+func (m *Map) Add(keys ...string) {
+ for _, key := range keys {
+ for i := 0; i < m.replicas; i++ {
+ hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
+ m.keys = append(m.keys, hash)
+ m.hashMap[hash] = key
+ }
+ }
+ sort.Ints(m.keys)
+}
+
+// Gets the closest item in the hash to the provided key.
+func (m *Map) Get(key string) string {
+ if m.IsEmpty() {
+ return ""
+ }
+
+ hash := int(m.hash([]byte(key)))
+
+ // Binary search for appropriate replica.
+ idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
+
+ // Means we have cycled back to the first replica.
+ if idx == len(m.keys) {
+ idx = 0
+ }
+
+ return m.hashMap[m.keys[idx]]
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
new file mode 100644
index 00000000..e0ff8632
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
@@ -0,0 +1,83 @@
+package internal
+
+import (
+ "io"
+ "net"
+ "strings"
+
+ "github.com/go-redis/redis/internal/proto"
+)
+
+func IsRetryableError(err error, retryNetError bool) bool {
+ if IsNetworkError(err) {
+ return retryNetError
+ }
+ s := err.Error()
+ if s == "ERR max number of clients reached" {
+ return true
+ }
+ if strings.HasPrefix(s, "LOADING ") {
+ return true
+ }
+ if strings.HasPrefix(s, "READONLY ") {
+ return true
+ }
+ if strings.HasPrefix(s, "CLUSTERDOWN ") {
+ return true
+ }
+ return false
+}
+
+func IsRedisError(err error) bool {
+ _, ok := err.(proto.RedisError)
+ return ok
+}
+
+func IsNetworkError(err error) bool {
+ if err == io.EOF {
+ return true
+ }
+ _, ok := err.(net.Error)
+ return ok
+}
+
+func IsBadConn(err error, allowTimeout bool) bool {
+ if err == nil {
+ return false
+ }
+ if IsRedisError(err) {
+ return strings.HasPrefix(err.Error(), "READONLY ")
+ }
+ if allowTimeout {
+ if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ return false
+ }
+ }
+ return true
+}
+
+func IsMovedError(err error) (moved bool, ask bool, addr string) {
+ if !IsRedisError(err) {
+ return
+ }
+
+ s := err.Error()
+ if strings.HasPrefix(s, "MOVED ") {
+ moved = true
+ } else if strings.HasPrefix(s, "ASK ") {
+ ask = true
+ } else {
+ return
+ }
+
+ ind := strings.LastIndex(s, " ")
+ if ind == -1 {
+ return false, false, ""
+ }
+ addr = s[ind+1:]
+ return
+}
+
+func IsLoadingError(err error) bool {
+ return strings.HasPrefix(err.Error(), "LOADING ")
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go b/src/dma/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go
new file mode 100644
index 00000000..22f5b398
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/hashtag/hashtag.go
@@ -0,0 +1,77 @@
+package hashtag
+
+import (
+ "math/rand"
+ "strings"
+)
+
+const slotNumber = 16384
+
+// CRC16 implementation according to CCITT standards.
+// Copyright 2001-2010 Georges Menie (www.menie.org)
+// Copyright 2013 The Go Authors. All rights reserved.
+// http://redis.io/topics/cluster-spec#appendix-a-crc16-reference-implementation-in-ansi-c
+var crc16tab = [256]uint16{
+ 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
+ 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
+ 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
+ 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
+ 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
+ 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
+ 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
+ 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
+ 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
+ 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
+ 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
+ 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
+ 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
+ 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
+ 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
+ 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
+ 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
+ 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
+ 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
+ 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
+ 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
+ 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
+ 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
+ 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
+ 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
+ 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
+ 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
+ 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
+ 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
+ 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
+ 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
+ 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
+}
+
+func Key(key string) string {
+ if s := strings.IndexByte(key, '{'); s > -1 {
+ if e := strings.IndexByte(key[s+1:], '}'); e > 0 {
+ return key[s+1 : s+e+1]
+ }
+ }
+ return key
+}
+
+func RandomSlot() int {
+ return rand.Intn(slotNumber)
+}
+
+// hashSlot returns a consistent slot number between 0 and 16383
+// for any given string key.
+func Slot(key string) int {
+ if key == "" {
+ return RandomSlot()
+ }
+ key = Key(key)
+ return int(crc16sum(key)) % slotNumber
+}
+
+func crc16sum(key string) (crc uint16) {
+ for i := 0; i < len(key); i++ {
+ crc = (crc << 8) ^ crc16tab[(byte(crc>>8)^key[i])&0x00ff]
+ }
+ return
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/internal.go b/src/dma/vendor/github.com/go-redis/redis/internal/internal.go
new file mode 100644
index 00000000..ad3fc3c9
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/internal.go
@@ -0,0 +1,24 @@
+package internal
+
+import (
+ "math/rand"
+ "time"
+)
+
+// Retry backoff with jitter sleep to prevent overloaded conditions during intervals
+// https://www.awsarchitectureblog.com/2015/03/backoff.html
+func RetryBackoff(retry int, minBackoff, maxBackoff time.Duration) time.Duration {
+ if retry < 0 {
+ retry = 0
+ }
+
+ backoff := minBackoff << uint(retry)
+ if backoff > maxBackoff || backoff < minBackoff {
+ backoff = maxBackoff
+ }
+
+ if backoff == 0 {
+ return 0
+ }
+ return time.Duration(rand.Int63n(int64(backoff)))
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/log.go b/src/dma/vendor/github.com/go-redis/redis/internal/log.go
new file mode 100644
index 00000000..fd14222e
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/log.go
@@ -0,0 +1,15 @@
+package internal
+
+import (
+ "fmt"
+ "log"
+)
+
+var Logger *log.Logger
+
+func Logf(s string, args ...interface{}) {
+ if Logger == nil {
+ return
+ }
+ Logger.Output(2, fmt.Sprintf(s, args...))
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/once.go b/src/dma/vendor/github.com/go-redis/redis/internal/once.go
new file mode 100644
index 00000000..64f46272
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/once.go
@@ -0,0 +1,60 @@
+/*
+Copyright 2014 The Camlistore Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+// A Once will perform a successful action exactly once.
+//
+// Unlike a sync.Once, this Once's func returns an error
+// and is re-armed on failure.
+type Once struct {
+ m sync.Mutex
+ done uint32
+}
+
+// Do calls the function f if and only if Do has not been invoked
+// without error for this instance of Once. In other words, given
+// var once Once
+// if once.Do(f) is called multiple times, only the first call will
+// invoke f, even if f has a different value in each invocation unless
+// f returns an error. A new instance of Once is required for each
+// function to execute.
+//
+// Do is intended for initialization that must be run exactly once. Since f
+// is niladic, it may be necessary to use a function literal to capture the
+// arguments to a function to be invoked by Do:
+// err := config.once.Do(func() error { return config.init(filename) })
+func (o *Once) Do(f func() error) error {
+ if atomic.LoadUint32(&o.done) == 1 {
+ return nil
+ }
+ // Slow-path.
+ o.m.Lock()
+ defer o.m.Unlock()
+ var err error
+ if o.done == 0 {
+ err = f()
+ if err == nil {
+ atomic.StoreUint32(&o.done, 1)
+ }
+ }
+ return err
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
new file mode 100644
index 00000000..acaf3665
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
@@ -0,0 +1,80 @@
+package pool
+
+import (
+ "net"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/internal/proto"
+)
+
+var noDeadline = time.Time{}
+
+type Conn struct {
+ netConn net.Conn
+
+ Rd *proto.Reader
+ Wb *proto.WriteBuffer
+
+ Inited bool
+ usedAt atomic.Value
+}
+
+func NewConn(netConn net.Conn) *Conn {
+ cn := &Conn{
+ netConn: netConn,
+ Wb: proto.NewWriteBuffer(),
+ }
+ cn.Rd = proto.NewReader(cn.netConn)
+ cn.SetUsedAt(time.Now())
+ return cn
+}
+
+func (cn *Conn) UsedAt() time.Time {
+ return cn.usedAt.Load().(time.Time)
+}
+
+func (cn *Conn) SetUsedAt(tm time.Time) {
+ cn.usedAt.Store(tm)
+}
+
+func (cn *Conn) SetNetConn(netConn net.Conn) {
+ cn.netConn = netConn
+ cn.Rd.Reset(netConn)
+}
+
+func (cn *Conn) IsStale(timeout time.Duration) bool {
+ return timeout > 0 && time.Since(cn.UsedAt()) > timeout
+}
+
+func (cn *Conn) SetReadTimeout(timeout time.Duration) {
+ now := time.Now()
+ cn.SetUsedAt(now)
+ if timeout > 0 {
+ cn.netConn.SetReadDeadline(now.Add(timeout))
+ } else {
+ cn.netConn.SetReadDeadline(noDeadline)
+ }
+}
+
+func (cn *Conn) SetWriteTimeout(timeout time.Duration) {
+ now := time.Now()
+ cn.SetUsedAt(now)
+ if timeout > 0 {
+ cn.netConn.SetWriteDeadline(now.Add(timeout))
+ } else {
+ cn.netConn.SetWriteDeadline(noDeadline)
+ }
+}
+
+func (cn *Conn) Write(b []byte) (int, error) {
+ return cn.netConn.Write(b)
+}
+
+func (cn *Conn) RemoteAddr() net.Addr {
+ return cn.netConn.RemoteAddr()
+}
+
+func (cn *Conn) Close() error {
+ return cn.netConn.Close()
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
new file mode 100644
index 00000000..cab66904
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -0,0 +1,416 @@
+package pool
+
+import (
+ "errors"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/internal"
+)
+
+var ErrClosed = errors.New("redis: client is closed")
+var ErrPoolTimeout = errors.New("redis: connection pool timeout")
+
+var timers = sync.Pool{
+ New: func() interface{} {
+ t := time.NewTimer(time.Hour)
+ t.Stop()
+ return t
+ },
+}
+
+// Stats contains pool state information and accumulated stats.
+type Stats struct {
+ Hits uint32 // number of times free connection was found in the pool
+ Misses uint32 // number of times free connection was NOT found in the pool
+ Timeouts uint32 // number of times a wait timeout occurred
+
+ TotalConns uint32 // number of total connections in the pool
+ FreeConns uint32 // deprecated - use IdleConns
+ IdleConns uint32 // number of idle connections in the pool
+ StaleConns uint32 // number of stale connections removed from the pool
+}
+
+type Pooler interface {
+ NewConn() (*Conn, error)
+ CloseConn(*Conn) error
+
+ Get() (*Conn, error)
+ Put(*Conn)
+ Remove(*Conn)
+
+ Len() int
+ IdleLen() int
+ Stats() *Stats
+
+ Close() error
+}
+
+type Options struct {
+ Dialer func() (net.Conn, error)
+ OnClose func(*Conn) error
+
+ PoolSize int
+ PoolTimeout time.Duration
+ IdleTimeout time.Duration
+ IdleCheckFrequency time.Duration
+}
+
+type ConnPool struct {
+ opt *Options
+
+ dialErrorsNum uint32 // atomic
+
+ lastDialError error
+ lastDialErrorMu sync.RWMutex
+
+ queue chan struct{}
+
+ connsMu sync.Mutex
+ conns []*Conn
+
+ idleConnsMu sync.RWMutex
+ idleConns []*Conn
+
+ stats Stats
+
+ _closed uint32 // atomic
+}
+
+var _ Pooler = (*ConnPool)(nil)
+
+func NewConnPool(opt *Options) *ConnPool {
+ p := &ConnPool{
+ opt: opt,
+
+ queue: make(chan struct{}, opt.PoolSize),
+ conns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
+ }
+
+ if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
+ go p.reaper(opt.IdleCheckFrequency)
+ }
+
+ return p
+}
+
+func (p *ConnPool) NewConn() (*Conn, error) {
+ cn, err := p.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.connsMu.Unlock()
+ return cn, nil
+}
+
+func (p *ConnPool) newConn() (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
+ return nil, p.getLastDialError()
+ }
+
+ netConn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
+ go p.tryDial()
+ }
+ return nil, err
+ }
+
+ return NewConn(netConn), nil
+}
+
+func (p *ConnPool) tryDial() {
+ for {
+ if p.closed() {
+ return
+ }
+
+ conn, err := p.opt.Dialer()
+ if err != nil {
+ p.setLastDialError(err)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ atomic.StoreUint32(&p.dialErrorsNum, 0)
+ _ = conn.Close()
+ return
+ }
+}
+
+func (p *ConnPool) setLastDialError(err error) {
+ p.lastDialErrorMu.Lock()
+ p.lastDialError = err
+ p.lastDialErrorMu.Unlock()
+}
+
+func (p *ConnPool) getLastDialError() error {
+ p.lastDialErrorMu.RLock()
+ err := p.lastDialError
+ p.lastDialErrorMu.RUnlock()
+ return err
+}
+
+// Get returns existed connection from the pool or creates a new one.
+func (p *ConnPool) Get() (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ err := p.waitTurn()
+ if err != nil {
+ return nil, err
+ }
+
+ for {
+ p.idleConnsMu.Lock()
+ cn := p.popIdle()
+ p.idleConnsMu.Unlock()
+
+ if cn == nil {
+ break
+ }
+
+ if cn.IsStale(p.opt.IdleTimeout) {
+ p.CloseConn(cn)
+ continue
+ }
+
+ atomic.AddUint32(&p.stats.Hits, 1)
+ return cn, nil
+ }
+
+ atomic.AddUint32(&p.stats.Misses, 1)
+
+ newcn, err := p.NewConn()
+ if err != nil {
+ p.freeTurn()
+ return nil, err
+ }
+
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
+}
+
+func (p *ConnPool) waitTurn() error {
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
+ }
+
+ idx := len(p.idleConns) - 1
+ cn := p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+
+ return cn
+}
+
+func (p *ConnPool) Put(cn *Conn) {
+ buf := cn.Rd.PeekBuffered()
+ if buf != nil {
+ internal.Logf("connection has unread data: %.100q", buf)
+ p.Remove(cn)
+ return
+ }
+
+ p.idleConnsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsMu.Unlock()
+ p.freeTurn()
+}
+
+func (p *ConnPool) Remove(cn *Conn) {
+ p.removeConn(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
+}
+
+func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConn(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
+ p.connsMu.Lock()
+ for i, c := range p.conns {
+ if c == cn {
+ p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ break
+ }
+ }
+ p.connsMu.Unlock()
+}
+
+func (p *ConnPool) closeConn(cn *Conn) error {
+ if p.opt.OnClose != nil {
+ _ = p.opt.OnClose(cn)
+ }
+ return cn.Close()
+}
+
+// Len returns total number of connections.
+func (p *ConnPool) Len() int {
+ p.connsMu.Lock()
+ l := len(p.conns)
+ p.connsMu.Unlock()
+ return l
+}
+
+// FreeLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.idleConnsMu.RLock()
+ l := len(p.idleConns)
+ p.idleConnsMu.RUnlock()
+ return l
+}
+
+func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
+ return &Stats{
+ Hits: atomic.LoadUint32(&p.stats.Hits),
+ Misses: atomic.LoadUint32(&p.stats.Misses),
+ Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+
+ TotalConns: uint32(p.Len()),
+ FreeConns: uint32(idleLen),
+ IdleConns: uint32(idleLen),
+ StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
+ }
+}
+
+func (p *ConnPool) closed() bool {
+ return atomic.LoadUint32(&p._closed) == 1
+}
+
+func (p *ConnPool) Filter(fn func(*Conn) bool) error {
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if fn(cn) {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+ p.connsMu.Unlock()
+ return firstErr
+}
+
+func (p *ConnPool) Close() error {
+ if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
+ return ErrClosed
+ }
+
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ p.conns = nil
+ p.connsMu.Unlock()
+
+ p.idleConnsMu.Lock()
+ p.idleConns = nil
+ p.idleConnsMu.Unlock()
+
+ return firstErr
+}
+
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
+ }
+
+ cn := p.idleConns[0]
+ if !cn.IsStale(p.opt.IdleTimeout) {
+ return nil
+ }
+
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+
+ return cn
+}
+
+func (p *ConnPool) ReapStaleConns() (int, error) {
+ var n int
+ for {
+ p.getTurn()
+
+ p.idleConnsMu.Lock()
+ cn := p.reapStaleConn()
+ p.idleConnsMu.Unlock()
+
+ if cn != nil {
+ p.removeConn(cn)
+ }
+
+ p.freeTurn()
+
+ if cn != nil {
+ p.closeConn(cn)
+ n++
+ } else {
+ break
+ }
+ }
+ return n, nil
+}
+
+func (p *ConnPool) reaper(frequency time.Duration) {
+ ticker := time.NewTicker(frequency)
+ defer ticker.Stop()
+
+ for range ticker.C {
+ if p.closed() {
+ break
+ }
+ n, err := p.ReapStaleConns()
+ if err != nil {
+ internal.Logf("ReapStaleConns failed: %s", err)
+ continue
+ }
+ atomic.AddUint32(&p.stats.StaleConns, uint32(n))
+ }
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_single.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
new file mode 100644
index 00000000..b35b78af
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_single.go
@@ -0,0 +1,53 @@
+package pool
+
+type SingleConnPool struct {
+ cn *Conn
+}
+
+var _ Pooler = (*SingleConnPool)(nil)
+
+func NewSingleConnPool(cn *Conn) *SingleConnPool {
+ return &SingleConnPool{
+ cn: cn,
+ }
+}
+
+func (p *SingleConnPool) NewConn() (*Conn, error) {
+ panic("not implemented")
+}
+
+func (p *SingleConnPool) CloseConn(*Conn) error {
+ panic("not implemented")
+}
+
+func (p *SingleConnPool) Get() (*Conn, error) {
+ return p.cn, nil
+}
+
+func (p *SingleConnPool) Put(cn *Conn) {
+ if p.cn != cn {
+ panic("p.cn != cn")
+ }
+}
+
+func (p *SingleConnPool) Remove(cn *Conn) {
+ if p.cn != cn {
+ panic("p.cn != cn")
+ }
+}
+
+func (p *SingleConnPool) Len() int {
+ return 1
+}
+
+func (p *SingleConnPool) IdleLen() int {
+ return 0
+}
+
+func (p *SingleConnPool) Stats() *Stats {
+ return nil
+}
+
+func (p *SingleConnPool) Close() error {
+ return nil
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
new file mode 100644
index 00000000..91bd9133
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool_sticky.go
@@ -0,0 +1,109 @@
+package pool
+
+import "sync"
+
+type StickyConnPool struct {
+ pool *ConnPool
+ reusable bool
+
+ cn *Conn
+ closed bool
+ mu sync.Mutex
+}
+
+var _ Pooler = (*StickyConnPool)(nil)
+
+func NewStickyConnPool(pool *ConnPool, reusable bool) *StickyConnPool {
+ return &StickyConnPool{
+ pool: pool,
+ reusable: reusable,
+ }
+}
+
+func (p *StickyConnPool) NewConn() (*Conn, error) {
+ panic("not implemented")
+}
+
+func (p *StickyConnPool) CloseConn(*Conn) error {
+ panic("not implemented")
+}
+
+func (p *StickyConnPool) Get() (*Conn, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return nil, ErrClosed
+ }
+ if p.cn != nil {
+ return p.cn, nil
+ }
+
+ cn, err := p.pool.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ p.cn = cn
+ return cn, nil
+}
+
+func (p *StickyConnPool) putUpstream() {
+ p.pool.Put(p.cn)
+ p.cn = nil
+}
+
+func (p *StickyConnPool) Put(cn *Conn) {}
+
+func (p *StickyConnPool) removeUpstream() {
+ p.pool.Remove(p.cn)
+ p.cn = nil
+}
+
+func (p *StickyConnPool) Remove(cn *Conn) {
+ p.removeUpstream()
+}
+
+func (p *StickyConnPool) Len() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.cn == nil {
+ return 0
+ }
+ return 1
+}
+
+func (p *StickyConnPool) IdleLen() int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.cn == nil {
+ return 1
+ }
+ return 0
+}
+
+func (p *StickyConnPool) Stats() *Stats {
+ return nil
+}
+
+func (p *StickyConnPool) Close() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.closed {
+ return ErrClosed
+ }
+ p.closed = true
+
+ if p.cn != nil {
+ if p.reusable {
+ p.putUpstream()
+ } else {
+ p.removeUpstream()
+ }
+ }
+
+ return nil
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
new file mode 100644
index 00000000..8c28c7b7
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -0,0 +1,316 @@
+package proto
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/go-redis/redis/internal/util"
+)
+
+const bytesAllocLimit = 1024 * 1024 // 1mb
+
+const (
+ ErrorReply = '-'
+ StatusReply = '+'
+ IntReply = ':'
+ StringReply = '$'
+ ArrayReply = '*'
+)
+
+//------------------------------------------------------------------------------
+
+const Nil = RedisError("redis: nil")
+
+type RedisError string
+
+func (e RedisError) Error() string { return string(e) }
+
+//------------------------------------------------------------------------------
+
+type MultiBulkParse func(*Reader, int64) (interface{}, error)
+
+type Reader struct {
+ src *bufio.Reader
+ buf []byte
+}
+
+func NewReader(rd io.Reader) *Reader {
+ return &Reader{
+ src: bufio.NewReader(rd),
+ buf: make([]byte, 4096),
+ }
+}
+
+func (r *Reader) Reset(rd io.Reader) {
+ r.src.Reset(rd)
+}
+
+func (r *Reader) PeekBuffered() []byte {
+ if n := r.src.Buffered(); n != 0 {
+ b, _ := r.src.Peek(n)
+ return b
+ }
+ return nil
+}
+
+func (r *Reader) ReadN(n int) ([]byte, error) {
+ b, err := readN(r.src, r.buf, n)
+ if err != nil {
+ return nil, err
+ }
+ r.buf = b
+ return b, nil
+}
+
+func (r *Reader) ReadLine() ([]byte, error) {
+ line, isPrefix, err := r.src.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ if isPrefix {
+ return nil, bufio.ErrBufferFull
+ }
+ if len(line) == 0 {
+ return nil, fmt.Errorf("redis: reply is empty")
+ }
+ if isNilReply(line) {
+ return nil, Nil
+ }
+ return line, nil
+}
+
+func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
+ line, err := r.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StatusReply:
+ return parseTmpStatusReply(line), nil
+ case IntReply:
+ return util.ParseInt(line[1:], 10, 64)
+ case StringReply:
+ return r.readTmpBytesReply(line)
+ case ArrayReply:
+ n, err := parseArrayLen(line)
+ if err != nil {
+ return nil, err
+ }
+ return m(r, n)
+ }
+ return nil, fmt.Errorf("redis: can't parse %.100q", line)
+}
+
+func (r *Reader) ReadIntReply() (int64, error) {
+ line, err := r.ReadLine()
+ if err != nil {
+ return 0, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return 0, ParseErrorReply(line)
+ case IntReply:
+ return util.ParseInt(line[1:], 10, 64)
+ default:
+ return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
+ }
+}
+
+func (r *Reader) ReadTmpBytesReply() ([]byte, error) {
+ line, err := r.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StringReply:
+ return r.readTmpBytesReply(line)
+ case StatusReply:
+ return parseTmpStatusReply(line), nil
+ default:
+ return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
+ }
+}
+
+func (r *Reader) ReadBytesReply() ([]byte, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return nil, err
+ }
+ cp := make([]byte, len(b))
+ copy(cp, b)
+ return cp, nil
+}
+
+func (r *Reader) ReadStringReply() (string, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return "", err
+ }
+ return string(b), nil
+}
+
+func (r *Reader) ReadFloatReply() (float64, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return util.ParseFloat(b, 64)
+}
+
+func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
+ line, err := r.ReadLine()
+ if err != nil {
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case ArrayReply:
+ n, err := parseArrayLen(line)
+ if err != nil {
+ return nil, err
+ }
+ return m(r, n)
+ default:
+ return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
+ }
+}
+
+func (r *Reader) ReadArrayLen() (int64, error) {
+ line, err := r.ReadLine()
+ if err != nil {
+ return 0, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return 0, ParseErrorReply(line)
+ case ArrayReply:
+ return parseArrayLen(line)
+ default:
+ return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
+ }
+}
+
+func (r *Reader) ReadScanReply() ([]string, uint64, error) {
+ n, err := r.ReadArrayLen()
+ if err != nil {
+ return nil, 0, err
+ }
+ if n != 2 {
+ return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
+ }
+
+ cursor, err := r.ReadUint()
+ if err != nil {
+ return nil, 0, err
+ }
+
+ n, err = r.ReadArrayLen()
+ if err != nil {
+ return nil, 0, err
+ }
+
+ keys := make([]string, n)
+ for i := int64(0); i < n; i++ {
+ key, err := r.ReadStringReply()
+ if err != nil {
+ return nil, 0, err
+ }
+ keys[i] = key
+ }
+
+ return keys, cursor, err
+}
+
+func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) {
+ if isNilReply(line) {
+ return nil, Nil
+ }
+
+ replyLen, err := strconv.Atoi(string(line[1:]))
+ if err != nil {
+ return nil, err
+ }
+
+ b, err := r.ReadN(replyLen + 2)
+ if err != nil {
+ return nil, err
+ }
+ return b[:replyLen], nil
+}
+
+func (r *Reader) ReadInt() (int64, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return util.ParseInt(b, 10, 64)
+}
+
+func (r *Reader) ReadUint() (uint64, error) {
+ b, err := r.ReadTmpBytesReply()
+ if err != nil {
+ return 0, err
+ }
+ return util.ParseUint(b, 10, 64)
+}
+
+// --------------------------------------------------------------------
+
+func readN(r io.Reader, b []byte, n int) ([]byte, error) {
+ if n == 0 && b == nil {
+ return make([]byte, 0), nil
+ }
+
+ if cap(b) >= n {
+ b = b[:n]
+ _, err := io.ReadFull(r, b)
+ return b, err
+ }
+ b = b[:cap(b)]
+
+ pos := 0
+ for pos < n {
+ diff := n - len(b)
+ if diff > bytesAllocLimit {
+ diff = bytesAllocLimit
+ }
+ b = append(b, make([]byte, diff)...)
+
+ nn, err := io.ReadFull(r, b[pos:])
+ if err != nil {
+ return nil, err
+ }
+ pos += nn
+ }
+
+ return b, nil
+}
+
+func isNilReply(b []byte) bool {
+ return len(b) == 3 &&
+ (b[0] == StringReply || b[0] == ArrayReply) &&
+ b[1] == '-' && b[2] == '1'
+}
+
+func ParseErrorReply(line []byte) error {
+ return RedisError(string(line[1:]))
+}
+
+func parseTmpStatusReply(line []byte) []byte {
+ return line[1:]
+}
+
+func parseArrayLen(line []byte) (int64, error) {
+ if isNilReply(line) {
+ return 0, Nil
+ }
+ return util.ParseInt(line[1:], 10, 64)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/scan.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/scan.go
new file mode 100644
index 00000000..3bdb33f9
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/scan.go
@@ -0,0 +1,166 @@
+package proto
+
+import (
+ "encoding"
+ "fmt"
+ "reflect"
+
+ "github.com/go-redis/redis/internal/util"
+)
+
+func Scan(b []byte, v interface{}) error {
+ switch v := v.(type) {
+ case nil:
+ return fmt.Errorf("redis: Scan(nil)")
+ case *string:
+ *v = util.BytesToString(b)
+ return nil
+ case *[]byte:
+ *v = b
+ return nil
+ case *int:
+ var err error
+ *v, err = util.Atoi(b)
+ return err
+ case *int8:
+ n, err := util.ParseInt(b, 10, 8)
+ if err != nil {
+ return err
+ }
+ *v = int8(n)
+ return nil
+ case *int16:
+ n, err := util.ParseInt(b, 10, 16)
+ if err != nil {
+ return err
+ }
+ *v = int16(n)
+ return nil
+ case *int32:
+ n, err := util.ParseInt(b, 10, 32)
+ if err != nil {
+ return err
+ }
+ *v = int32(n)
+ return nil
+ case *int64:
+ n, err := util.ParseInt(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = n
+ return nil
+ case *uint:
+ n, err := util.ParseUint(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = uint(n)
+ return nil
+ case *uint8:
+ n, err := util.ParseUint(b, 10, 8)
+ if err != nil {
+ return err
+ }
+ *v = uint8(n)
+ return nil
+ case *uint16:
+ n, err := util.ParseUint(b, 10, 16)
+ if err != nil {
+ return err
+ }
+ *v = uint16(n)
+ return nil
+ case *uint32:
+ n, err := util.ParseUint(b, 10, 32)
+ if err != nil {
+ return err
+ }
+ *v = uint32(n)
+ return nil
+ case *uint64:
+ n, err := util.ParseUint(b, 10, 64)
+ if err != nil {
+ return err
+ }
+ *v = n
+ return nil
+ case *float32:
+ n, err := util.ParseFloat(b, 32)
+ if err != nil {
+ return err
+ }
+ *v = float32(n)
+ return err
+ case *float64:
+ var err error
+ *v, err = util.ParseFloat(b, 64)
+ return err
+ case *bool:
+ *v = len(b) == 1 && b[0] == '1'
+ return nil
+ case encoding.BinaryUnmarshaler:
+ return v.UnmarshalBinary(b)
+ default:
+ return fmt.Errorf(
+ "redis: can't unmarshal %T (consider implementing BinaryUnmarshaler)", v)
+ }
+}
+
+func ScanSlice(data []string, slice interface{}) error {
+ v := reflect.ValueOf(slice)
+ if !v.IsValid() {
+ return fmt.Errorf("redis: ScanSlice(nil)")
+ }
+ if v.Kind() != reflect.Ptr {
+ return fmt.Errorf("redis: ScanSlice(non-pointer %T)", slice)
+ }
+ v = v.Elem()
+ if v.Kind() != reflect.Slice {
+ return fmt.Errorf("redis: ScanSlice(non-slice %T)", slice)
+ }
+
+ next := makeSliceNextElemFunc(v)
+ for i, s := range data {
+ elem := next()
+ if err := Scan([]byte(s), elem.Addr().Interface()); err != nil {
+ err = fmt.Errorf("redis: ScanSlice index=%d value=%q failed: %s", i, s, err)
+ return err
+ }
+ }
+
+ return nil
+}
+
+func makeSliceNextElemFunc(v reflect.Value) func() reflect.Value {
+ elemType := v.Type().Elem()
+
+ if elemType.Kind() == reflect.Ptr {
+ elemType = elemType.Elem()
+ return func() reflect.Value {
+ if v.Len() < v.Cap() {
+ v.Set(v.Slice(0, v.Len()+1))
+ elem := v.Index(v.Len() - 1)
+ if elem.IsNil() {
+ elem.Set(reflect.New(elemType))
+ }
+ return elem.Elem()
+ }
+
+ elem := reflect.New(elemType)
+ v.Set(reflect.Append(v, elem))
+ return elem.Elem()
+ }
+ }
+
+ zero := reflect.Zero(elemType)
+ return func() reflect.Value {
+ if v.Len() < v.Cap() {
+ v.Set(v.Slice(0, v.Len()+1))
+ return v.Index(v.Len() - 1)
+ }
+
+ v.Set(reflect.Append(v, zero))
+ return v.Index(v.Len() - 1)
+ }
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
new file mode 100644
index 00000000..664f4c33
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
@@ -0,0 +1,113 @@
+package proto
+
+import (
+ "encoding"
+ "fmt"
+ "strconv"
+)
+
+type WriteBuffer struct {
+ b []byte
+}
+
+func NewWriteBuffer() *WriteBuffer {
+ return &WriteBuffer{
+ b: make([]byte, 0, 4096),
+ }
+}
+
+func (w *WriteBuffer) Len() int { return len(w.b) }
+func (w *WriteBuffer) Bytes() []byte { return w.b }
+func (w *WriteBuffer) Reset() { w.b = w.b[:0] }
+
+func (w *WriteBuffer) Append(args []interface{}) error {
+ w.b = append(w.b, ArrayReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(args)), 10)
+ w.b = append(w.b, '\r', '\n')
+
+ for _, arg := range args {
+ if err := w.append(arg); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (w *WriteBuffer) append(val interface{}) error {
+ switch v := val.(type) {
+ case nil:
+ w.AppendString("")
+ case string:
+ w.AppendString(v)
+ case []byte:
+ w.AppendBytes(v)
+ case int:
+ w.AppendString(formatInt(int64(v)))
+ case int8:
+ w.AppendString(formatInt(int64(v)))
+ case int16:
+ w.AppendString(formatInt(int64(v)))
+ case int32:
+ w.AppendString(formatInt(int64(v)))
+ case int64:
+ w.AppendString(formatInt(v))
+ case uint:
+ w.AppendString(formatUint(uint64(v)))
+ case uint8:
+ w.AppendString(formatUint(uint64(v)))
+ case uint16:
+ w.AppendString(formatUint(uint64(v)))
+ case uint32:
+ w.AppendString(formatUint(uint64(v)))
+ case uint64:
+ w.AppendString(formatUint(v))
+ case float32:
+ w.AppendString(formatFloat(float64(v)))
+ case float64:
+ w.AppendString(formatFloat(v))
+ case bool:
+ if v {
+ w.AppendString("1")
+ } else {
+ w.AppendString("0")
+ }
+ case encoding.BinaryMarshaler:
+ b, err := v.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ w.AppendBytes(b)
+ default:
+ return fmt.Errorf(
+ "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val)
+ }
+ return nil
+}
+
+func (w *WriteBuffer) AppendString(s string) {
+ w.b = append(w.b, StringReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(s)), 10)
+ w.b = append(w.b, '\r', '\n')
+ w.b = append(w.b, s...)
+ w.b = append(w.b, '\r', '\n')
+}
+
+func (w *WriteBuffer) AppendBytes(p []byte) {
+ w.b = append(w.b, StringReply)
+ w.b = strconv.AppendUint(w.b, uint64(len(p)), 10)
+ w.b = append(w.b, '\r', '\n')
+ w.b = append(w.b, p...)
+ w.b = append(w.b, '\r', '\n')
+}
+
+func formatInt(n int64) string {
+ return strconv.FormatInt(n, 10)
+}
+
+func formatUint(u uint64) string {
+ return strconv.FormatUint(u, 10)
+}
+
+func formatFloat(f float64) string {
+ return strconv.FormatFloat(f, 'f', -1, 64)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
new file mode 100644
index 00000000..3b174172
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
@@ -0,0 +1,64 @@
+/*
+Copyright 2013 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package singleflight provides a duplicate function call suppression
+// mechanism.
+package singleflight
+
+import "sync"
+
+// call is an in-flight or completed Do call
+type call struct {
+ wg sync.WaitGroup
+ val interface{}
+ err error
+}
+
+// Group represents a class of work and forms a namespace in which
+// units of work can be executed with duplicate suppression.
+type Group struct {
+ mu sync.Mutex // protects m
+ m map[string]*call // lazily initialized
+}
+
+// Do executes and returns the results of the given function, making
+// sure that only one execution is in-flight for a given key at a
+// time. If a duplicate comes in, the duplicate caller waits for the
+// original to complete and receives the same results.
+func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
+ g.mu.Lock()
+ if g.m == nil {
+ g.m = make(map[string]*call)
+ }
+ if c, ok := g.m[key]; ok {
+ g.mu.Unlock()
+ c.wg.Wait()
+ return c.val, c.err
+ }
+ c := new(call)
+ c.wg.Add(1)
+ g.m[key] = c
+ g.mu.Unlock()
+
+ c.val, c.err = fn()
+ c.wg.Done()
+
+ g.mu.Lock()
+ delete(g.m, key)
+ g.mu.Unlock()
+
+ return c.val, c.err
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util.go b/src/dma/vendor/github.com/go-redis/redis/internal/util.go
new file mode 100644
index 00000000..ffd2353e
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util.go
@@ -0,0 +1,29 @@
+package internal
+
+import "github.com/go-redis/redis/internal/util"
+
+func ToLower(s string) string {
+ if isLower(s) {
+ return s
+ }
+
+ b := make([]byte, len(s))
+ for i := range b {
+ c := s[i]
+ if c >= 'A' && c <= 'Z' {
+ c += 'a' - 'A'
+ }
+ b[i] = c
+ }
+ return util.BytesToString(b)
+}
+
+func isLower(s string) bool {
+ for i := 0; i < len(s); i++ {
+ c := s[i]
+ if c >= 'A' && c <= 'Z' {
+ return false
+ }
+ }
+ return true
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
new file mode 100644
index 00000000..cd891833
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
@@ -0,0 +1,7 @@
+// +build appengine
+
+package util
+
+func BytesToString(b []byte) string {
+ return string(b)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/strconv.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/strconv.go
new file mode 100644
index 00000000..db503380
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/strconv.go
@@ -0,0 +1,19 @@
+package util
+
+import "strconv"
+
+func Atoi(b []byte) (int, error) {
+ return strconv.Atoi(BytesToString(b))
+}
+
+func ParseInt(b []byte, base int, bitSize int) (int64, error) {
+ return strconv.ParseInt(BytesToString(b), base, bitSize)
+}
+
+func ParseUint(b []byte, base int, bitSize int) (uint64, error) {
+ return strconv.ParseUint(BytesToString(b), base, bitSize)
+}
+
+func ParseFloat(b []byte, bitSize int) (float64, error) {
+ return strconv.ParseFloat(BytesToString(b), bitSize)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
new file mode 100644
index 00000000..93a89c55
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
@@ -0,0 +1,12 @@
+// +build !appengine
+
+package util
+
+import (
+ "unsafe"
+)
+
+// BytesToString converts byte slice to string.
+func BytesToString(b []byte) string {
+ return *(*string)(unsafe.Pointer(&b))
+}