aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com')
-rw-r--r--src/dma/vendor/github.com/BurntSushi/toml/COPYING27
-rw-r--r--src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING27
-rw-r--r--src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING27
-rw-r--r--src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING27
-rw-r--r--src/dma/vendor/github.com/BurntSushi/toml/lex.go2
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/.travis.yml3
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md13
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/Makefile4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/cluster.go499
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/command.go993
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/commands.go380
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/error.go30
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go51
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go132
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go162
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go113
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go159
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go64
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go10
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/options.go27
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/parser.go394
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/pipeline.go7
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/pubsub.go127
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/redis.go173
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/result.go2
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/ring.go106
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/sentinel.go252
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/tx.go4
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/universal.go102
-rw-r--r--src/dma/vendor/github.com/labstack/echo/.travis.yml10
-rw-r--r--src/dma/vendor/github.com/labstack/echo/Gopkg.lock75
-rw-r--r--src/dma/vendor/github.com/labstack/echo/Gopkg.toml42
-rw-r--r--src/dma/vendor/github.com/labstack/echo/Makefile14
-rw-r--r--src/dma/vendor/github.com/labstack/echo/README.md2
-rw-r--r--src/dma/vendor/github.com/labstack/echo/bind.go48
-rw-r--r--src/dma/vendor/github.com/labstack/echo/context.go90
-rw-r--r--src/dma/vendor/github.com/labstack/echo/echo.go113
-rw-r--r--src/dma/vendor/github.com/labstack/echo/group.go19
-rw-r--r--src/dma/vendor/github.com/labstack/echo/log.go1
-rw-r--r--src/dma/vendor/github.com/labstack/echo/router.go47
-rw-r--r--src/dma/vendor/github.com/labstack/gommon/LICENSE2
-rw-r--r--src/dma/vendor/github.com/labstack/gommon/log/log.go2
-rw-r--r--src/dma/vendor/github.com/libvirt/libvirt-go/domain.go53
-rw-r--r--src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h24
-rw-r--r--src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go20
-rw-r--r--src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h8
-rw-r--r--src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go208
-rw-r--r--src/dma/vendor/github.com/mattn/go-colorable/go.mod3
-rw-r--r--src/dma/vendor/github.com/mattn/go-colorable/go.sum4
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/.travis.yml4
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/go.mod3
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/go.sum2
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go (renamed from src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go)12
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go15
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go6
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go21
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go9
-rw-r--r--src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go6
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/.gitignore8
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/.travis.yml7
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/LICENSE2
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/README.md2
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/auth.go16
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/channel.go5
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/connection.go43
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/delivery.go2
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/go.mod3
-rwxr-xr-xsrc/dma/vendor/github.com/streadway/amqp/pre-commit88
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/types.go3
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/uri.go9
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/write.go5
-rw-r--r--src/dma/vendor/github.com/valyala/fasttemplate/go.mod3
-rw-r--r--src/dma/vendor/github.com/valyala/fasttemplate/go.sum2
74 files changed, 3053 insertions, 1929 deletions
diff --git a/src/dma/vendor/github.com/BurntSushi/toml/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/COPYING
index 5a8e3325..01b57432 100644
--- a/src/dma/vendor/github.com/BurntSushi/toml/COPYING
+++ b/src/dma/vendor/github.com/BurntSushi/toml/COPYING
@@ -1,14 +1,21 @@
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- Version 2, December 2004
+The MIT License (MIT)
- Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
+Copyright (c) 2013 TOML authors
- Everyone is permitted to copy and distribute verbatim or modified
- copies of this license document, and changing it is allowed as long
- as the name is changed.
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
-
- 0. You just DO WHAT THE FUCK YOU WANT TO.
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING
index 5a8e3325..01b57432 100644
--- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING
+++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING
@@ -1,14 +1,21 @@
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- Version 2, December 2004
+The MIT License (MIT)
- Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
+Copyright (c) 2013 TOML authors
- Everyone is permitted to copy and distribute verbatim or modified
- copies of this license document, and changing it is allowed as long
- as the name is changed.
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
-
- 0. You just DO WHAT THE FUCK YOU WANT TO.
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING
index 5a8e3325..01b57432 100644
--- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING
+++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING
@@ -1,14 +1,21 @@
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- Version 2, December 2004
+The MIT License (MIT)
- Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
+Copyright (c) 2013 TOML authors
- Everyone is permitted to copy and distribute verbatim or modified
- copies of this license document, and changing it is allowed as long
- as the name is changed.
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
-
- 0. You just DO WHAT THE FUCK YOU WANT TO.
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING
index 5a8e3325..01b57432 100644
--- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING
+++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING
@@ -1,14 +1,21 @@
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- Version 2, December 2004
+The MIT License (MIT)
- Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
+Copyright (c) 2013 TOML authors
- Everyone is permitted to copy and distribute verbatim or modified
- copies of this license document, and changing it is allowed as long
- as the name is changed.
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
- DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
- TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
-
- 0. You just DO WHAT THE FUCK YOU WANT TO.
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/src/dma/vendor/github.com/BurntSushi/toml/lex.go b/src/dma/vendor/github.com/BurntSushi/toml/lex.go
index 6dee7fc7..e0a742a8 100644
--- a/src/dma/vendor/github.com/BurntSushi/toml/lex.go
+++ b/src/dma/vendor/github.com/BurntSushi/toml/lex.go
@@ -775,7 +775,7 @@ func lexDatetime(lx *lexer) stateFn {
return lexDatetime
}
switch r {
- case '-', 'T', ':', '.', 'Z':
+ case '-', 'T', ':', '.', 'Z', '+':
return lexDatetime
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/.travis.yml b/src/dma/vendor/github.com/go-redis/redis/.travis.yml
index 39ffc2be..6b110b4c 100644
--- a/src/dma/vendor/github.com/go-redis/redis/.travis.yml
+++ b/src/dma/vendor/github.com/go-redis/redis/.travis.yml
@@ -5,10 +5,9 @@ services:
- redis-server
go:
- - 1.7.x
- - 1.8.x
- 1.9.x
- 1.10.x
+ - 1.11.x
- tip
matrix:
diff --git a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
index cb0e1b8e..19645661 100644
--- a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
+++ b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md
@@ -1,5 +1,18 @@
# Changelog
+## Unreleased
+
+- Cluster and Ring pipelines process commands for each node in its own goroutine.
+
+## 6.14
+
+- Added Options.MinIdleConns.
+- Added Options.MaxConnAge.
+- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
+- Add Client.Do to simplify creating custom commands.
+- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers.
+- Lower memory usage.
+
## v6.13
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards.
diff --git a/src/dma/vendor/github.com/go-redis/redis/Makefile b/src/dma/vendor/github.com/go-redis/redis/Makefile
index 1fbdac91..fa3b4e00 100644
--- a/src/dma/vendor/github.com/go-redis/redis/Makefile
+++ b/src/dma/vendor/github.com/go-redis/redis/Makefile
@@ -3,6 +3,8 @@ all: testdeps
go test ./... -short -race
env GOOS=linux GOARCH=386 go test ./...
go vet
+ go get github.com/gordonklaus/ineffassign
+ ineffassign .
testdeps: testdata/redis/src/redis-server
@@ -13,7 +15,7 @@ bench: testdeps
testdata/redis:
mkdir -p $@
- wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
+ wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@
testdata/redis/src/redis-server: testdata/redis
sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $</src/Makefile
diff --git a/src/dma/vendor/github.com/go-redis/redis/cluster.go b/src/dma/vendor/github.com/go-redis/redis/cluster.go
index 7a1af143..0cecc62c 100644
--- a/src/dma/vendor/github.com/go-redis/redis/cluster.go
+++ b/src/dma/vendor/github.com/go-redis/redis/cluster.go
@@ -3,11 +3,11 @@ package redis
import (
"context"
"crypto/tls"
- "errors"
"fmt"
"math"
"math/rand"
"net"
+ "runtime"
"sort"
"sync"
"sync/atomic"
@@ -17,7 +17,6 @@ import (
"github.com/go-redis/redis/internal/hashtag"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/singleflight"
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@@ -49,14 +48,18 @@ type ClusterOptions struct {
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
+ // Optional hook that is called when a new node is created.
+ OnNewNode func(*Client)
+
// Following options are copied from Options struct.
OnConnect func(*Conn) error
+ Password string
+
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
- Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@@ -64,6 +67,8 @@ type ClusterOptions struct {
// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
- if opt.RouteByLatency || opt.RouteRandomly {
+ if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
opt.ReadOnly = true
}
+ if opt.PoolSize == 0 {
+ opt.PoolSize = 5 * runtime.NumCPU()
+ }
+
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
@@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
- PoolSize: opt.PoolSize,
- PoolTimeout: opt.PoolTimeout,
- IdleTimeout: opt.IdleTimeout,
-
+ PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
+ PoolTimeout: opt.PoolTimeout,
+ IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: disableIdleCheck,
TLSConfig: opt.TLSConfig,
@@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
go node.updateLatency()
}
+ if clOpt.OnNewNode != nil {
+ clOpt.OnNewNode(node.Client)
+ }
+
return &node
}
@@ -228,8 +242,6 @@ type clusterNodes struct {
clusterAddrs []string
closed bool
- nodeCreateGroup singleflight.Group
-
_generation uint32 // atomic
}
@@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return node, nil
}
- v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
- node := newClusterNode(c.opt, addr)
- return node, nil
- })
-
c.mu.Lock()
defer c.mu.Unlock()
@@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
node, ok := c.allNodes[addr]
if ok {
- _ = v.(*clusterNode).Close()
return node, err
}
- node = v.(*clusterNode)
+
+ node = newClusterNode(c.opt, addr)
c.allAddrs = appendIfNotExists(c.allAddrs, addr)
- if err == nil {
- c.clusterAddrs = append(c.clusterAddrs, addr)
- }
+ c.clusterAddrs = append(c.clusterAddrs, addr)
c.allNodes[addr] = node
return node, err
@@ -429,13 +434,15 @@ func newClusterState(
createdAt: time.Now(),
}
- isLoopbackOrigin := isLoopbackAddr(origin)
+ originHost, _, _ := net.SplitHostPort(origin)
+ isLoopbackOrigin := isLoopback(originHost)
+
for _, slot := range slots {
var nodes []*clusterNode
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
- if !isLoopbackOrigin && useOriginAddr(origin, addr) {
- addr = origin
+ if !isLoopbackOrigin {
+ addr = replaceLoopbackHost(addr, originHost)
}
node, err := c.nodes.GetOrCreate(addr)
@@ -469,6 +476,33 @@ func newClusterState(
return &c, nil
}
+func replaceLoopbackHost(nodeAddr, originHost string) string {
+ nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+ if err != nil {
+ return nodeAddr
+ }
+
+ nodeIP := net.ParseIP(nodeHost)
+ if nodeIP == nil {
+ return nodeAddr
+ }
+
+ if !nodeIP.IsLoopback() {
+ return nodeAddr
+ }
+
+ // Use origin host which is not loopback and node port.
+ return net.JoinHostPort(originHost, nodePort)
+}
+
+func isLoopback(host string) bool {
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return true
+ }
+ return ip.IsLoopback()
+}
+
func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) > 0 {
@@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
- break
+ return slave, nil
}
}
- return slave, nil
+
+ // All slaves are loading - use master.
+ return nodes[0], nil
}
}
@@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
return nil
}
-func (c *clusterState) IsConsistent() bool {
- if c.nodes.opt.ClusterSlots != nil {
- return true
- }
- return len(c.Masters) <= len(c.Slaves)
-}
-
//------------------------------------------------------------------------------
type clusterStateHolder struct {
load func() (*clusterState, error)
- state atomic.Value
-
- firstErrMu sync.RWMutex
- firstErr error
-
+ state atomic.Value
reloading uint32 // atomic
}
@@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
}
func (c *clusterStateHolder) Reload() (*clusterState, error) {
- state, err := c.reload()
- if err != nil {
- return nil, err
- }
- if !state.IsConsistent() {
- time.AfterFunc(time.Second, c.LazyReload)
- }
- return state, nil
-}
-
-func (c *clusterStateHolder) reload() (*clusterState, error) {
state, err := c.load()
if err != nil {
- c.firstErrMu.Lock()
- if c.firstErr == nil {
- c.firstErr = err
- }
- c.firstErrMu.Unlock()
return nil, err
}
c.state.Store(state)
@@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() {
go func() {
defer atomic.StoreUint32(&c.reloading, 0)
- for {
- state, err := c.reload()
- if err != nil {
- return
- }
- time.Sleep(100 * time.Millisecond)
- if state.IsConsistent() {
- return
- }
+ _, err := c.Reload()
+ if err != nil {
+ return
}
+ time.Sleep(100 * time.Millisecond)
}()
}
@@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
}
return state, nil
}
-
- c.firstErrMu.RLock()
- err := c.firstErr
- c.firstErrMu.RUnlock()
- if err != nil {
- return nil, err
- }
-
- return nil, errors.New("redis: cluster has no state")
+ return c.Reload()
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c.processTxPipeline = c.defaultProcessTxPipeline
c.init()
-
- _, _ = c.state.Reload()
- _, _ = c.cmdsInfoCache.Get()
-
if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
}
@@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
-// ReloadState reloads cluster state. It calls ClusterSlots func
+func (c *ClusterClient) init() {
+ c.cmdable.setProcessor(c.Process)
+}
+
+// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
-func (c *ClusterClient) init() {
- c.cmdable.setProcessor(c.Process)
-}
-
func (c *ClusterClient) Context() context.Context {
if c.ctx != nil {
return c.ctx
@@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int {
}
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+ args := cmd.Args()
+ if args[0] == "cluster" && args[1] == "getkeysinslot" {
+ return args[2].(int)
+ }
+
cmdInfo := c.cmdInfo(cmd.Name())
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}
@@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
}
cmdInfo := c.cmdInfo(cmd.Name())
- slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
+ slot := c.cmdSlot(cmd)
- if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
+ if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
@@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
if err == nil {
break
}
-
- if internal.IsRetryableError(err, true) {
+ if err != Nil {
c.state.LazyReload()
- continue
}
moved, ask, addr := internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
@@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node, err = c.slotMasterNode(slot)
if err != nil {
return err
@@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
continue
}
+ if internal.IsRetryableError(err, true) {
+ continue
+ }
+
return err
}
@@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *ClusterClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
@@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
if err == nil {
break
}
+ if err != Nil {
+ c.state.LazyReload()
+ }
- // If slave is loading - read from master.
+ // If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
- continue
- }
-
- if internal.IsRetryableError(err, true) {
- c.state.LazyReload()
-
- // First retry the same node.
- if attempt == 0 {
- continue
- }
-
- // Second try random node.
- node, err = c.nodes.Random()
- if err != nil {
- break
- }
+ node = nil
continue
}
@@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
- c.state.LazyReload()
-
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
break
@@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
continue
}
- if err == pool.ErrClosed {
+ if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
node = nil
continue
}
+ if internal.IsRetryableError(err, true) {
+ // First retry the same node.
+ if attempt == 0 {
+ continue
+ }
+
+ // Second try random node.
+ node, err = c.nodes.Random()
+ if err != nil {
+ break
+ }
+ continue
+ }
+
break
}
@@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
acc.StaleConns += s.StaleConns
}
@@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline(
}
func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
- cmdsMap, err := c.mapCmdsByNode(cmds)
+ cmdsMap := newCmdsMap()
+ err := c.mapCmdsByNode(cmds, cmdsMap)
if err != nil {
setCmdsErr(cmds, err)
return err
@@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
- for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ for node, cmds := range cmdsMap.m {
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
-func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
+type cmdsMap struct {
+ mu sync.Mutex
+ m map[*clusterNode][]Cmder
+}
+
+func newCmdsMap() *cmdsMap {
+ return &cmdsMap{
+ m: make(map[*clusterNode][]Cmder),
+ }
+}
+
+func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get()
if err != nil {
setCmdsErr(cmds, err)
- return nil, err
+ return err
}
- cmdsMap := make(map[*clusterNode][]Cmder)
cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
var node *clusterNode
@@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
node, err = state.slotMasterNode(slot)
}
if err != nil {
- return nil, err
+ return err
}
- cmdsMap[node] = append(cmdsMap[node], cmd)
+ cmdsMap.mu.Lock()
+ cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
+ cmdsMap.mu.Unlock()
}
- return cmdsMap, nil
+ return nil
}
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
return true
}
-func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
- remappedCmds, err := c.mapCmdsByNode(cmds)
- if err != nil {
- setCmdsErr(cmds, err)
- return
- }
-
- for node, cmds := range remappedCmds {
- failedCmds[node] = cmds
- }
-}
-
func (c *ClusterClient) pipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
-
- err := writeCmd(cn, cmds...)
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
if err != nil {
setCmdsErr(cmds, err)
- failedCmds[node] = cmds
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- return c.pipelineReadCmds(cn, cmds, failedCmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return c.pipelineReadCmds(node, rd, cmds, failedCmds)
+ })
+ return err
}
func (c *ClusterClient) pipelineReadCmds(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
+ var firstErr error
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err == nil {
continue
}
@@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}
- return err
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
+ if firstErr == nil {
+ firstErr = err
+ }
}
- return nil
+ return firstErr
}
func (c *ClusterClient) checkMovedErr(
- cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+ cmd Cmder, err error, failedCmds *cmdsMap,
) bool {
moved, ask, addr := internal.IsMovedError(err)
@@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr(
return false
}
- failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
+ failedCmds.mu.Unlock()
return true
}
@@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
- failedCmds := make(map[*clusterNode][]Cmder)
+ failedCmds := newCmdsMap()
+ var wg sync.WaitGroup
for node, cmds := range cmdsMap {
- cn, err := node.Client.getConn()
- if err != nil {
- if err == pool.ErrClosed {
- c.remapCmds(cmds, failedCmds)
- } else {
- setCmdsErr(cmds, err)
+ wg.Add(1)
+ go func(node *clusterNode, cmds []Cmder) {
+ defer wg.Done()
+
+ cn, err := node.Client.getConn()
+ if err != nil {
+ if err == pool.ErrClosed {
+ c.mapCmdsByNode(cmds, failedCmds)
+ } else {
+ setCmdsErr(cmds, err)
+ }
+ return
}
- continue
- }
- err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
- if err == nil || internal.IsRedisError(err) {
- node.Client.connPool.Put(cn)
- } else {
- node.Client.connPool.Remove(cn)
- }
+ err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
+ node.Client.releaseConnStrict(cn, err)
+ }(node, cmds)
}
- if len(failedCmds) == 0 {
+ wg.Wait()
+ if len(failedCmds.m) == 0 {
break
}
- cmdsMap = failedCmds
+ cmdsMap = failedCmds.m
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
@@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
}
func (c *ClusterClient) txPipelineProcessCmds(
- node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- failedCmds[node] = cmds
- return err
- }
-
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
+ failedCmds.mu.Lock()
+ failedCmds.m[node] = cmds
+ failedCmds.mu.Unlock()
return err
}
- return pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := c.txPipelineReadQueued(rd, cmds, failedCmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return err
}
func (c *ClusterClient) txPipelineReadQueued(
- cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+ rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
- err := statusCmd.readReply(cn)
+ err := statusCmd.readReply(rd)
if err == nil {
continue
}
@@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued(
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
return nil
}
-func (c *ClusterClient) pubSub(channels []string) *PubSub {
+func (c *ClusterClient) pubSub() *PubSub {
var node *clusterNode
pubsub := &PubSub{
opt: c.opt.clientOptions(),
newConn: func(channels []string) (*pool.Conn, error) {
- if node == nil {
- var slot int
- if len(channels) > 0 {
- slot = hashtag.Slot(channels[0])
- } else {
- slot = -1
- }
+ if node != nil {
+ panic("node != nil")
+ }
- masterNode, err := c.slotMasterNode(slot)
- if err != nil {
- return nil, err
- }
- node = masterNode
+ slot := hashtag.Slot(channels[0])
+
+ var err error
+ node, err = c.slotMasterNode(slot)
+ if err != nil {
+ return nil, err
}
- return node.Client.newConn()
+
+ cn, err := node.Client.newConn()
+ if err != nil {
+ return nil, err
+ }
+
+ return cn, nil
},
closeConn: func(cn *pool.Conn) error {
- return node.Client.connPool.CloseConn(cn)
+ err := node.Client.connPool.CloseConn(cn)
+ node = nil
+ return err
},
}
pubsub.init()
+
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
@@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
- pubsub := c.pubSub(channels)
+ pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
-func useOriginAddr(originAddr, nodeAddr string) bool {
- nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
- if err != nil {
- return false
- }
-
- nodeIP := net.ParseIP(nodeHost)
- if nodeIP == nil {
- return false
- }
-
- if !nodeIP.IsLoopback() {
- return false
- }
-
- _, originPort, err := net.SplitHostPort(originAddr)
- if err != nil {
- return false
- }
-
- return nodePort == originPort
-}
-
-func isLoopbackAddr(addr string) bool {
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return false
- }
-
- ip := net.ParseIP(host)
- if ip == nil {
- return false
- }
-
- return ip.IsLoopback()
-}
-
func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {
diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go
index 11472bec..cb4f94b1 100644
--- a/src/dma/vendor/github.com/go-redis/redis/command.go
+++ b/src/dma/vendor/github.com/go-redis/redis/command.go
@@ -1,16 +1,14 @@
package redis
import (
- "bytes"
"fmt"
+ "net"
"strconv"
"strings"
"time"
"github.com/go-redis/redis/internal"
- "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/util"
)
type Cmder interface {
@@ -18,13 +16,12 @@ type Cmder interface {
Args() []interface{}
stringArg(int) string
- readReply(*pool.Conn) error
+ readReply(rd *proto.Reader) error
setErr(error)
readTimeout() *time.Duration
Err() error
- fmt.Stringer
}
func setCmdsErr(cmds []Cmder, e error) {
@@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) {
}
}
-func firstCmdsErr(cmds []Cmder) error {
+func cmdsFirstErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
@@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error {
return nil
}
-func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
- cn.Wb.Reset()
+func writeCmd(wr *proto.Writer, cmds ...Cmder) error {
for _, cmd := range cmds {
- if err := cn.Wb.Append(cmd.Args()); err != nil {
+ err := wr.WriteArgs(cmd.Args())
+ if err != nil {
return err
}
}
-
- _, err := cn.Write(cn.Wb.Bytes())
- return err
+ return nil
}
func cmdString(cmd Cmder, val interface{}) string {
@@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) {
return cmd.val, cmd.err
}
-func (cmd *Cmd) String() string {
- return cmdString(cmd, cmd.val)
+func (cmd *Cmd) String() (string, error) {
+ if cmd.err != nil {
+ return "", cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case string:
+ return val, nil
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for String", val)
+ return "", err
+ }
}
-func (cmd *Cmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
+func (cmd *Cmd) Int() (int, error) {
if cmd.err != nil {
- return cmd.err
+ return 0, cmd.err
}
- if b, ok := cmd.val.([]byte); ok {
- // Bytes must be copied, because underlying memory is reused.
- cmd.val = string(b)
+ switch val := cmd.val.(type) {
+ case int64:
+ return int(val), nil
+ case string:
+ return strconv.Atoi(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int", val)
+ return 0, err
}
- return nil
+}
+
+func (cmd *Cmd) Int64() (int64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val, nil
+ case string:
+ return strconv.ParseInt(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Uint64() (uint64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return uint64(val), nil
+ case string:
+ return strconv.ParseUint(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Uint64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Float64() (float64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return float64(val), nil
+ case string:
+ return strconv.ParseFloat(val, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Float64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Bool() (bool, error) {
+ if cmd.err != nil {
+ return false, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val != 0, nil
+ case string:
+ return strconv.ParseBool(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Bool", val)
+ return false, err
+ }
+}
+
+func (cmd *Cmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadReply(sliceParser)
+ return cmd.err
+}
+
+// Implements proto.MultiBulkParse
+func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ vals := make([]interface{}, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(sliceParser)
+ if err != nil {
+ if err == Nil {
+ vals = append(vals, nil)
+ continue
+ }
+ if err, ok := err.(proto.RedisError); ok {
+ vals = append(vals, err)
+ continue
+ }
+ return nil, err
+ }
+
+ switch v := v.(type) {
+ case string:
+ vals = append(vals, v)
+ default:
+ vals = append(vals, v)
+ }
+ }
+ return vals, nil
}
//------------------------------------------------------------------------------
@@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
+ v, cmd.err = rd.ReadArrayReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadStringReply()
+func (cmd *StatusCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *IntCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadIntReply()
+func (cmd *IntCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadIntReply()
return cmd.err
}
@@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
+func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
var n int64
- n, cmd.err = cn.Rd.ReadIntReply()
+ n, cmd.err = rd.ReadIntReply()
if cmd.err != nil {
return cmd.err
}
@@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
+func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
+ v, cmd.err = rd.ReadArrayReply(timeParser)
if cmd.err != nil {
return cmd.err
}
@@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d elements, expected 2", n)
+ }
+
+ sec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ microsec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ return time.Unix(sec, microsec*1000), nil
+}
+
//------------------------------------------------------------------------------
type BoolCmd struct {
@@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string {
return cmdString(cmd, cmd.val)
}
-var ok = []byte("OK")
-
-func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadReply(nil)
+ v, cmd.err = rd.ReadReply(nil)
// `SET key value NX` returns nil when key already exists. But
// `SETNX key value` returns bool (0/1). So convert nil to bool.
// TODO: is this okay?
@@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
case int64:
cmd.val = v == 1
return nil
- case []byte:
- cmd.val = bytes.Equal(v, ok)
+ case string:
+ cmd.val = v == "OK"
return nil
default:
cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
@@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
type StringCmd struct {
baseCmd
- val []byte
+ val string
}
var _ Cmder = (*StringCmd)(nil)
@@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd {
}
func (cmd *StringCmd) Val() string {
- return util.BytesToString(cmd.val)
+ return cmd.val
}
func (cmd *StringCmd) Result() (string, error) {
@@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) {
}
func (cmd *StringCmd) Bytes() ([]byte, error) {
- return cmd.val, cmd.err
+ return []byte(cmd.val), cmd.err
+}
+
+func (cmd *StringCmd) Int() (int, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ return strconv.Atoi(cmd.Val())
}
func (cmd *StringCmd) Int64() (int64, error) {
@@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error {
if cmd.err != nil {
return cmd.err
}
- return proto.Scan(cmd.val, val)
+ return proto.Scan([]byte(cmd.val), val)
}
func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadBytesReply()
+func (cmd *StringCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadFloatReply()
+func (cmd *FloatCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadFloatReply()
return cmd.err
}
@@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
return proto.ScanSlice(cmd.Val(), container)
}
-func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
+ v, cmd.err = rd.ReadArrayReply(stringSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ss := make([]string, 0, n)
+ for i := int64(0); i < n; i++ {
+ s, err := rd.ReadString()
+ if err == Nil {
+ ss = append(ss, "")
+ } else if err != nil {
+ return nil, err
+ } else {
+ ss = append(ss, s)
+ }
+ }
+ return ss, nil
+}
+
//------------------------------------------------------------------------------
type BoolSliceCmd struct {
@@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
+ v, cmd.err = rd.ReadArrayReply(boolSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ bools := make([]bool, 0, n)
+ for i := int64(0); i < n; i++ {
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ bools = append(bools, n == 1)
+ }
+ return bools, nil
+}
+
//------------------------------------------------------------------------------
type StringStringMapCmd struct {
@@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStringMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]string, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringIntMapCmd struct {
@@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringIntMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]int64, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = n
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringStructMapCmd struct {
@@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStructMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
return nil
}
-//------------------------------------------------------------------------------
+// Implements proto.MultiBulkParse
+func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]struct{}, n)
+ for i := int64(0); i < n; i++ {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
-type XStream struct {
- Stream string
- Messages []*XMessage
+ m[key] = struct{}{}
+ }
+ return m, nil
}
+//------------------------------------------------------------------------------
+
type XMessage struct {
ID string
Values map[string]interface{}
}
+type XMessageSliceCmd struct {
+ baseCmd
+
+ val []XMessage
+}
+
+var _ Cmder = (*XMessageSliceCmd)(nil)
+
+func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
+ return &XMessageSliceCmd{
+ baseCmd: baseCmd{_args: args},
+ }
+}
+
+func (cmd *XMessageSliceCmd) Val() []XMessage {
+ return cmd.val
+}
+
+func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XMessageSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]XMessage)
+ return nil
+}
+
+// Implements proto.MultiBulkParse
+func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ msgs := make([]XMessage, 0, n)
+ for i := int64(0); i < n; i++ {
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(stringInterfaceMapParser)
+ if err != nil {
+ return nil, err
+ }
+
+ msgs = append(msgs, XMessage{
+ ID: id,
+ Values: v.(map[string]interface{}),
+ })
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return msgs, nil
+}
+
+// Implements proto.MultiBulkParse
+func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]interface{}, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
+type XStream struct {
+ Stream string
+ Messages []XMessage
+}
+
type XStreamSliceCmd struct {
baseCmd
- val []*XStream
+ val []XStream
}
var _ Cmder = (*XStreamSliceCmd)(nil)
@@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
}
}
-func (cmd *XStreamSliceCmd) Val() []*XStream {
+func (cmd *XStreamSliceCmd) Val() []XStream {
return cmd.val
}
-func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
+func (cmd *XStreamSliceCmd) Result() ([]XStream, error) {
return cmd.val, cmd.err
}
@@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
+ v, cmd.err = rd.ReadArrayReply(xStreamSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XStream)
+ cmd.val = v.([]XStream)
return nil
}
// Implements proto.MultiBulkParse
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- xx := make([]*XStream, n)
+ ret := make([]XStream, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xStreamParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ stream, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(xMessageSliceParser)
+ if err != nil {
+ return nil, err
+ }
+
+ ret = append(ret, XStream{
+ Stream: stream,
+ Messages: v.([]XMessage),
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- xx[i] = v.(*XStream)
}
- return xx, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
- if n != 2 {
- return nil, fmt.Errorf("got %d, wanted 2", n)
+//------------------------------------------------------------------------------
+
+type XPending struct {
+ Count int64
+ Lower string
+ Higher string
+ Consumers map[string]int64
+}
+
+type XPendingCmd struct {
+ baseCmd
+ val *XPending
+}
+
+var _ Cmder = (*XPendingCmd)(nil)
+
+func NewXPendingCmd(args ...interface{}) *XPendingCmd {
+ return &XPendingCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *XPendingCmd) Val() *XPending {
+ return cmd.val
+}
+
+func (cmd *XPendingCmd) Result() (*XPending, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XPendingCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = info.(*XPending)
+ return nil
+}
- stream, err := rd.ReadStringReply()
+func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ count, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
- v, err := rd.ReadArrayReply(xMessageSliceParser)
- if err != nil {
+ lower, err := rd.ReadString()
+ if err != nil && err != Nil {
return nil, err
}
- return &XStream{
- Stream: stream,
- Messages: v.([]*XMessage),
- }, nil
+ higher, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ pending := &XPending{
+ Count: count,
+ Lower: lower,
+ Higher: higher,
+ }
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ for i := int64(0); i < n; i++ {
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ consumerName, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumerPending, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ if pending.Consumers == nil {
+ pending.Consumers = make(map[string]int64)
+ }
+ pending.Consumers[consumerName] = consumerPending
+
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return nil, nil
+ })
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ return pending, nil
}
//------------------------------------------------------------------------------
-type XMessageSliceCmd struct {
- baseCmd
+type XPendingExt struct {
+ Id string
+ Consumer string
+ Idle time.Duration
+ RetryCount int64
+}
- val []*XMessage
+type XPendingExtCmd struct {
+ baseCmd
+ val []XPendingExt
}
-var _ Cmder = (*XMessageSliceCmd)(nil)
+var _ Cmder = (*XPendingExtCmd)(nil)
-func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
- return &XMessageSliceCmd{
+func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd {
+ return &XPendingExtCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *XMessageSliceCmd) Val() []*XMessage {
+func (cmd *XPendingExtCmd) Val() []XPendingExt {
return cmd.val
}
-func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
+func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) {
return cmd.val, cmd.err
}
-func (cmd *XMessageSliceCmd) String() string {
+func (cmd *XPendingExtCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
- var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
+func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XMessage)
+ cmd.val = info.([]XPendingExt)
return nil
}
-// Implements proto.MultiBulkParse
-func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- msgs := make([]*XMessage, n)
+func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ret := make([]XPendingExt, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xMessageParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumer, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ idle, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ retryCount, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ ret = append(ret, XPendingExt{
+ Id: id,
+ Consumer: consumer,
+ Idle: time.Duration(idle) * time.Millisecond,
+ RetryCount: retryCount,
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- msgs[i] = v.(*XMessage)
}
- return msgs, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
- id, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
+//------------------------------------------------------------------------------
- v, err := rd.ReadArrayReply(xKeyValueParser)
- if err != nil {
- return nil, err
+//------------------------------------------------------------------------------
+
+type ZSliceCmd struct {
+ baseCmd
+
+ val []Z
+}
+
+var _ Cmder = (*ZSliceCmd)(nil)
+
+func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
+ return &ZSliceCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *ZSliceCmd) Val() []Z {
+ return cmd.val
+}
- return &XMessage{
- ID: id,
- Values: v.(map[string]interface{}),
- }, nil
+func (cmd *ZSliceCmd) Result() ([]Z, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *ZSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(zSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]Z)
+ return nil
}
// Implements proto.MultiBulkParse
-func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
- values := make(map[string]interface{}, n)
+func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
+ var err error
+
+ z := &zz[i/2]
+
+ z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
- value, err := rd.ReadStringReply()
+ z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
-
- values[key] = value
}
- return values, nil
+ return zz, nil
}
//------------------------------------------------------------------------------
-type ZSliceCmd struct {
+type ZWithKeyCmd struct {
baseCmd
- val []Z
+ val ZWithKey
}
-var _ Cmder = (*ZSliceCmd)(nil)
+var _ Cmder = (*ZWithKeyCmd)(nil)
-func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
- return &ZSliceCmd{
+func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
+ return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *ZSliceCmd) Val() []Z {
+func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
-func (cmd *ZSliceCmd) Result() ([]Z, error) {
- return cmd.val, cmd.err
+func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
+ return cmd.Val(), cmd.Err()
}
-func (cmd *ZSliceCmd) String() string {
+func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
+ v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]Z)
+ cmd.val = v.(ZWithKey)
return nil
}
+// Implements proto.MultiBulkParse
+func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 3 {
+ return nil, fmt.Errorf("got %d elements, expected 3", n)
+ }
+
+ var z ZWithKey
+ var err error
+
+ z.Key, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Member, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Score, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ return z, nil
+}
+
//------------------------------------------------------------------------------
type ScanCmd struct {
@@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.page)
}
-func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
- cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
+func (cmd *ScanCmd) readReply(rd *proto.Reader) error {
+ cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
return cmd.err
}
@@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
+func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
+ v, cmd.err = rd.ReadArrayReply(clusterSlotsParser)
if cmd.err != nil {
return cmd.err
}
@@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
+ slots := make([]ClusterSlot, n)
+ for i := 0; i < len(slots); i++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n < 2 {
+ err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
+ return nil, err
+ }
+
+ start, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ end, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes := make([]ClusterNode, n-2)
+ for j := 0; j < len(nodes); j++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 && n != 3 {
+ err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
+ return nil, err
+ }
+
+ ip, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ port, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes[j].Addr = net.JoinHostPort(ip, port)
+
+ if n == 3 {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ nodes[j].Id = id
+ }
+ }
+
+ slots[i] = ClusterSlot{
+ Start: int(start),
+ End: int(end),
+ Nodes: nodes,
+ }
+ }
+ return slots, nil
+}
+
//------------------------------------------------------------------------------
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
-func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
+ v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
if cmd.err != nil {
return cmd.err
}
@@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
return nil
}
+func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ var loc GeoLocation
+ var err error
+
+ loc.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ if q.WithDist {
+ loc.Dist, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithGeoHash {
+ loc.GeoHash, err = rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithCoord {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 {
+ return nil, fmt.Errorf("got %d coordinates, expected 2", n)
+ }
+
+ loc.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ loc.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &loc, nil
+ }
+}
+
+func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ locs := make([]GeoLocation, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(newGeoLocationParser(q))
+ if err != nil {
+ return nil, err
+ }
+ switch vv := v.(type) {
+ case string:
+ locs = append(locs, GeoLocation{
+ Name: vv,
+ })
+ case *GeoLocation:
+ locs = append(locs, *vv)
+ default:
+ return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
+ }
+ }
+ return locs, nil
+ }
+}
+
//------------------------------------------------------------------------------
type GeoPos struct {
@@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string {
return cmdString(cmd, cmd.positions)
}
-func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser)
+ v, cmd.err = rd.ReadArrayReply(geoPosSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
return nil
}
+func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ positions := make([]*GeoPos, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(geoPosParser)
+ if err != nil {
+ if err == Nil {
+ positions = append(positions, nil)
+ continue
+ }
+ return nil, err
+ }
+ switch v := v.(type) {
+ case *GeoPos:
+ positions = append(positions, v)
+ default:
+ return nil, fmt.Errorf("got %T, expected *GeoPos", v)
+ }
+ }
+ return positions, nil
+}
+
+func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var pos GeoPos
+ var err error
+
+ pos.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ pos.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ return &pos, nil
+}
+
//------------------------------------------------------------------------------
type CommandInfo struct {
@@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
+func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser)
+ v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]*CommandInfo, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(commandInfoParser)
+ if err != nil {
+ return nil, err
+ }
+ vv := v.(*CommandInfo)
+ m[vv.Name] = vv
+
+ }
+ return m, nil
+}
+
+func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var cmd CommandInfo
+ var err error
+
+ if n != 6 {
+ return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
+ }
+
+ cmd.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ arity, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.Arity = int8(arity)
+
+ flags, err := rd.ReadReply(stringSliceParser)
+ if err != nil {
+ return nil, err
+ }
+ cmd.Flags = flags.([]string)
+
+ firstKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.FirstKeyPos = int8(firstKeyPos)
+
+ lastKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.LastKeyPos = int8(lastKeyPos)
+
+ stepCount, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.StepCount = int8(stepCount)
+
+ for _, flag := range cmd.Flags {
+ if flag == "readonly" {
+ cmd.ReadOnly = true
+ break
+ }
+ }
+
+ return &cmd, nil
+}
+
//------------------------------------------------------------------------------
type cmdsInfoCache struct {
diff --git a/src/dma/vendor/github.com/go-redis/redis/commands.go b/src/dma/vendor/github.com/go-redis/redis/commands.go
index dddf8acd..653e4abe 100644
--- a/src/dma/vendor/github.com/go-redis/redis/commands.go
+++ b/src/dma/vendor/github.com/go-redis/redis/commands.go
@@ -8,13 +8,6 @@ import (
"github.com/go-redis/redis/internal"
)
-func readTimeout(timeout time.Duration) time.Duration {
- if timeout == 0 {
- return 0
- }
- return timeout + 10*time.Second
-}
-
func usePrecise(dur time.Duration) bool {
return dur < time.Second || dur%time.Second != 0
}
@@ -172,16 +165,30 @@ type Cmdable interface {
SRem(key string, members ...interface{}) *IntCmd
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
- XAdd(stream, id string, els map[string]interface{}) *StringCmd
- XAddExt(opt *XAddExt) *StringCmd
- XLen(key string) *IntCmd
+ XAdd(a *XAddArgs) *StringCmd
+ XDel(stream string, ids ...string) *IntCmd
+ XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
XRevRange(stream string, start, stop string) *XMessageSliceCmd
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
- XRead(streams ...string) *XStreamSliceCmd
- XReadN(count int64, streams ...string) *XStreamSliceCmd
- XReadExt(opt *XReadExt) *XStreamSliceCmd
+ XRead(a *XReadArgs) *XStreamSliceCmd
+ XReadStreams(streams ...string) *XStreamSliceCmd
+ XGroupCreate(stream, group, start string) *StatusCmd
+ XGroupCreateMkStream(stream, group, start string) *StatusCmd
+ XGroupSetID(stream, group, start string) *StatusCmd
+ XGroupDestroy(stream, group string) *IntCmd
+ XGroupDelConsumer(stream, group, consumer string) *IntCmd
+ XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd
+ XAck(stream, group string, ids ...string) *IntCmd
+ XPending(stream, group string) *XPendingCmd
+ XPendingExt(a *XPendingExtArgs) *XPendingExtCmd
+ XClaim(a *XClaimArgs) *XMessageSliceCmd
+ XClaimJustID(a *XClaimArgs) *StringSliceCmd
+ XTrim(key string, maxLen int64) *IntCmd
+ XTrimApprox(key string, maxLen int64) *IntCmd
+ BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
+ BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@@ -196,6 +203,8 @@ type Cmdable interface {
ZLexCount(key, min, max string) *IntCmd
ZIncrBy(key string, increment float64, member string) *FloatCmd
ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
+ ZPopMax(key string, count ...int64) *ZSliceCmd
+ ZPopMin(key string, count ...int64) *ZSliceCmd
ZRange(key string, start, stop int64) *StringSliceCmd
ZRangeWithScores(key string, start, stop int64) *ZSliceCmd
ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd
@@ -223,6 +232,7 @@ type Cmdable interface {
ClientKillByFilter(keys ...string) *IntCmd
ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd
+ ClientID() *IntCmd
ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd
@@ -260,6 +270,7 @@ type Cmdable interface {
ClusterResetHard() *StatusCmd
ClusterInfo() *StringCmd
ClusterKeySlot(key string) *IntCmd
+ ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
ClusterCountFailureReports(nodeID string) *IntCmd
ClusterCountKeysInSlot(slot int) *IntCmd
ClusterDelSlots(slots ...int) *StatusCmd
@@ -1300,7 +1311,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
//------------------------------------------------------------------------------
-type XAddExt struct {
+type XAddArgs struct {
Stream string
MaxLen int64 // MAXLEN N
MaxLenApprox int64 // MAXLEN ~ N
@@ -1308,40 +1319,42 @@ type XAddExt struct {
Values map[string]interface{}
}
-func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
- a := make([]interface{}, 0, 6+len(opt.Values)*2)
- a = append(a, "xadd")
- a = append(a, opt.Stream)
- if opt.MaxLen > 0 {
- a = append(a, "maxlen", opt.MaxLen)
- } else if opt.MaxLenApprox > 0 {
- a = append(a, "maxlen", "~", opt.MaxLenApprox)
+func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
+ args := make([]interface{}, 0, 6+len(a.Values)*2)
+ args = append(args, "xadd")
+ args = append(args, a.Stream)
+ if a.MaxLen > 0 {
+ args = append(args, "maxlen", a.MaxLen)
+ } else if a.MaxLenApprox > 0 {
+ args = append(args, "maxlen", "~", a.MaxLenApprox)
}
- if opt.ID != "" {
- a = append(a, opt.ID)
+ if a.ID != "" {
+ args = append(args, a.ID)
} else {
- a = append(a, "*")
+ args = append(args, "*")
}
- for k, v := range opt.Values {
- a = append(a, k)
- a = append(a, v)
+ for k, v := range a.Values {
+ args = append(args, k)
+ args = append(args, v)
}
- cmd := NewStringCmd(a...)
+ cmd := NewStringCmd(args...)
c.process(cmd)
return cmd
}
-func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
- return c.XAddExt(&XAddExt{
- Stream: stream,
- ID: id,
- Values: values,
- })
+func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
+ args := []interface{}{"xdel", stream}
+ for _, id := range ids {
+ args = append(args, id)
+ }
+ cmd := NewIntCmd(args...)
+ c.process(cmd)
+ return cmd
}
-func (c *cmdable) XLen(key string) *IntCmd {
- cmd := NewIntCmd("xlen", key)
+func (c *cmdable) XLen(stream string) *IntCmd {
+ cmd := NewIntCmd("xlen", stream)
c.process(cmd)
return cmd
}
@@ -1370,55 +1383,190 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS
return cmd
}
-type XReadExt struct {
+type XReadArgs struct {
Streams []string
Count int64
Block time.Duration
}
-func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
- a := make([]interface{}, 0, 5+len(opt.Streams))
- a = append(a, "xread")
- if opt != nil {
- if opt.Count > 0 {
- a = append(a, "count")
- a = append(a, opt.Count)
- }
- if opt.Block >= 0 {
- a = append(a, "block")
- a = append(a, int64(opt.Block/time.Millisecond))
- }
+func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
+ args := make([]interface{}, 0, 5+len(a.Streams))
+ args = append(args, "xread")
+ if a.Count > 0 {
+ args = append(args, "count")
+ args = append(args, a.Count)
+ }
+ if a.Block >= 0 {
+ args = append(args, "block")
+ args = append(args, int64(a.Block/time.Millisecond))
}
- a = append(a, "streams")
- for _, s := range opt.Streams {
- a = append(a, s)
+ args = append(args, "streams")
+ for _, s := range a.Streams {
+ args = append(args, s)
}
- cmd := NewXStreamSliceCmd(a...)
+ cmd := NewXStreamSliceCmd(args...)
+ if a.Block >= 0 {
+ cmd.setReadTimeout(a.Block)
+ }
c.process(cmd)
return cmd
}
-func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
+func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd {
+ return c.XRead(&XReadArgs{
Streams: streams,
Block: -1,
})
}
-func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
- Streams: streams,
- Count: count,
- Block: -1,
- })
+func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "create", stream, group, start)
+ c.process(cmd)
+ return cmd
}
-func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
- return c.XReadExt(&XReadExt{
- Streams: streams,
- Block: block,
- })
+func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
+ cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd {
+ cmd := NewIntCmd("xgroup", "destroy", stream, group)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
+ cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer)
+ c.process(cmd)
+ return cmd
+}
+
+type XReadGroupArgs struct {
+ Group string
+ Consumer string
+ // List of streams and ids.
+ Streams []string
+ Count int64
+ Block time.Duration
+ NoAck bool
+}
+
+func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
+ args := make([]interface{}, 0, 8+len(a.Streams))
+ args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
+ if a.Count > 0 {
+ args = append(args, "count", a.Count)
+ }
+ if a.Block >= 0 {
+ args = append(args, "block", int64(a.Block/time.Millisecond))
+ }
+ if a.NoAck {
+ args = append(args, "noack")
+ }
+ args = append(args, "streams")
+ for _, s := range a.Streams {
+ args = append(args, s)
+ }
+
+ cmd := NewXStreamSliceCmd(args...)
+ if a.Block >= 0 {
+ cmd.setReadTimeout(a.Block)
+ }
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd {
+ args := []interface{}{"xack", stream, group}
+ for _, id := range ids {
+ args = append(args, id)
+ }
+ cmd := NewIntCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XPending(stream, group string) *XPendingCmd {
+ cmd := NewXPendingCmd("xpending", stream, group)
+ c.process(cmd)
+ return cmd
+}
+
+type XPendingExtArgs struct {
+ Stream string
+ Group string
+ Start string
+ End string
+ Count int64
+ Consumer string
+}
+
+func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd {
+ args := make([]interface{}, 0, 7)
+ args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
+ if a.Consumer != "" {
+ args = append(args, a.Consumer)
+ }
+ cmd := NewXPendingExtCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+type XClaimArgs struct {
+ Stream string
+ Group string
+ Consumer string
+ MinIdle time.Duration
+ Messages []string
+}
+
+func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd {
+ args := xClaimArgs(a)
+ cmd := NewXMessageSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd {
+ args := xClaimArgs(a)
+ args = append(args, "justid")
+ cmd := NewStringSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func xClaimArgs(a *XClaimArgs) []interface{} {
+ args := make([]interface{}, 0, 4+len(a.Messages))
+ args = append(args,
+ "xclaim",
+ a.Stream,
+ a.Group, a.Consumer,
+ int64(a.MinIdle/time.Millisecond))
+ for _, id := range a.Messages {
+ args = append(args, id)
+ }
+ return args
+}
+
+func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd {
+ cmd := NewIntCmd("xtrim", key, "maxlen", maxLen)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd {
+ cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen)
+ c.process(cmd)
+ return cmd
}
//------------------------------------------------------------------------------
@@ -1429,6 +1577,12 @@ type Z struct {
Member interface{}
}
+// ZWithKey represents sorted set member including the name of the key where it was popped.
+type ZWithKey struct {
+ Z
+ Key string
+}
+
// ZStore is used as an arg to ZInterStore and ZUnionStore.
type ZStore struct {
Weights []float64
@@ -1436,6 +1590,34 @@ type ZStore struct {
Aggregate string
}
+// Redis `BZPOPMAX key [key ...] timeout` command.
+func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+ args := make([]interface{}, 1+len(keys)+1)
+ args[0] = "bzpopmax"
+ for i, key := range keys {
+ args[1+i] = key
+ }
+ args[len(args)-1] = formatSec(timeout)
+ cmd := NewZWithKeyCmd(args...)
+ cmd.setReadTimeout(timeout)
+ c.process(cmd)
+ return cmd
+}
+
+// Redis `BZPOPMIN key [key ...] timeout` command.
+func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+ args := make([]interface{}, 1+len(keys)+1)
+ args[0] = "bzpopmin"
+ for i, key := range keys {
+ args[1+i] = key
+ }
+ args[len(args)-1] = formatSec(timeout)
+ cmd := NewZWithKeyCmd(args...)
+ cmd.setReadTimeout(timeout)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
for i, m := range members {
a[n+2*i] = m.Score
@@ -1574,6 +1756,46 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string)
return cmd
}
+func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd {
+ args := []interface{}{
+ "zpopmax",
+ key,
+ }
+
+ switch len(count) {
+ case 0:
+ break
+ case 1:
+ args = append(args, count[0])
+ default:
+ panic("too many arguments")
+ }
+
+ cmd := NewZSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd {
+ args := []interface{}{
+ "zpopmin",
+ key,
+ }
+
+ switch len(count) {
+ case 0:
+ break
+ case 1:
+ args = append(args, count[0])
+ default:
+ panic("too many arguments")
+ }
+
+ cmd := NewZSliceCmd(args...)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd {
args := []interface{}{
"zrange",
@@ -1849,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd
}
+func (c *cmdable) ClientID() *IntCmd {
+ cmd := NewIntCmd("client", "id")
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ClientUnblock(id int64) *IntCmd {
+ cmd := NewIntCmd("client", "unblock", id)
+ c.process(cmd)
+ return cmd
+}
+
+func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
+ cmd := NewIntCmd("client", "unblock", id, "error")
+ c.process(cmd)
+ return cmd
+}
+
// ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name)
@@ -2164,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
return cmd
}
+func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
+ cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
+ c.process(cmd)
+ return cmd
+}
+
func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
c.process(cmd)
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
index e0ff8632..34f6bd4d 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/error.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go
@@ -8,9 +8,18 @@ import (
"github.com/go-redis/redis/internal/proto"
)
-func IsRetryableError(err error, retryNetError bool) bool {
- if IsNetworkError(err) {
- return retryNetError
+func IsRetryableError(err error, retryTimeout bool) bool {
+ if err == nil {
+ return false
+ }
+ if err == io.EOF {
+ return true
+ }
+ if netErr, ok := err.(net.Error); ok {
+ if netErr.Timeout() {
+ return retryTimeout
+ }
+ return true
}
s := err.Error()
if s == "ERR max number of clients reached" {
@@ -33,20 +42,13 @@ func IsRedisError(err error) bool {
return ok
}
-func IsNetworkError(err error) bool {
- if err == io.EOF {
- return true
- }
- _, ok := err.(net.Error)
- return ok
-}
-
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsRedisError(err) {
- return strings.HasPrefix(err.Error(), "READONLY ")
+ // #790
+ return IsReadOnlyError(err)
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@@ -81,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) {
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
+
+func IsReadOnlyError(err error) bool {
+ return strings.HasPrefix(err.Error(), "READONLY ")
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
index acaf3665..1095bfe5 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go
@@ -13,19 +13,21 @@ var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
- Rd *proto.Reader
- Wb *proto.WriteBuffer
+ rd *proto.Reader
+ rdLocked bool
+ wr *proto.Writer
- Inited bool
- usedAt atomic.Value
+ InitedAt time.Time
+ pooled bool
+ usedAt atomic.Value
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
- Wb: proto.NewWriteBuffer(),
}
- cn.Rd = proto.NewReader(cn.netConn)
+ cn.rd = proto.NewReader(netConn)
+ cn.wr = proto.NewWriter(netConn)
cn.SetUsedAt(time.Now())
return cn
}
@@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
- cn.Rd.Reset(netConn)
+ cn.rd.Reset(netConn)
+ cn.wr.Reset(netConn)
}
-func (cn *Conn) IsStale(timeout time.Duration) bool {
- return timeout > 0 && time.Since(cn.UsedAt()) > timeout
-}
-
-func (cn *Conn) SetReadTimeout(timeout time.Duration) {
+func (cn *Conn) setReadTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetReadDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetReadDeadline(noDeadline)
+ return cn.netConn.SetReadDeadline(now.Add(timeout))
}
+ return cn.netConn.SetReadDeadline(noDeadline)
}
-func (cn *Conn) SetWriteTimeout(timeout time.Duration) {
+func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
now := time.Now()
cn.SetUsedAt(now)
if timeout > 0 {
- cn.netConn.SetWriteDeadline(now.Add(timeout))
- } else {
- cn.netConn.SetWriteDeadline(noDeadline)
+ return cn.netConn.SetWriteDeadline(now.Add(timeout))
}
+ return cn.netConn.SetWriteDeadline(noDeadline)
}
func (cn *Conn) Write(b []byte) (int, error) {
@@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr {
return cn.netConn.RemoteAddr()
}
+func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
+ _ = cn.setReadTimeout(timeout)
+ return fn(cn.rd)
+}
+
+func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
+ _ = cn.setWriteTimeout(timeout)
+
+ firstErr := fn(cn.wr)
+ err := cn.wr.Flush()
+ if err != nil && firstErr == nil {
+ firstErr = err
+ }
+ return firstErr
+}
+
func (cn *Conn) Close() error {
return cn.netConn.Close()
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
index cab66904..9cecee8a 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go
@@ -28,7 +28,6 @@ type Stats struct {
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
- FreeConns uint32 // deprecated - use IdleConns
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
@@ -53,6 +52,8 @@ type Options struct {
OnClose func(*Conn) error
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -63,16 +64,16 @@ type ConnPool struct {
dialErrorsNum uint32 // atomic
- lastDialError error
lastDialErrorMu sync.RWMutex
+ lastDialError error
queue chan struct{}
- connsMu sync.Mutex
- conns []*Conn
-
- idleConnsMu sync.RWMutex
- idleConns []*Conn
+ connsMu sync.Mutex
+ conns []*Conn
+ idleConns []*Conn
+ poolSize int
+ idleConnsLen int
stats Stats
@@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool {
idleConns: make([]*Conn, 0, opt.PoolSize),
}
+ for i := 0; i < opt.MinIdleConns; i++ {
+ p.checkMinIdleConns()
+ }
+
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
@@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool {
return p
}
+func (p *ConnPool) checkMinIdleConns() {
+ if p.opt.MinIdleConns == 0 {
+ return
+ }
+ if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+ p.poolSize++
+ p.idleConnsLen++
+ go p.addIdleConn()
+ }
+}
+
+func (p *ConnPool) addIdleConn() {
+ cn, err := p.newConn(true)
+ if err != nil {
+ return
+ }
+
+ p.connsMu.Lock()
+ p.conns = append(p.conns, cn)
+ p.idleConns = append(p.idleConns, cn)
+ p.connsMu.Unlock()
+}
+
func (p *ConnPool) NewConn() (*Conn, error) {
- cn, err := p.newConn()
+ return p._NewConn(false)
+}
+
+func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
+ cn, err := p.newConn(pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
+ if pooled {
+ if p.poolSize < p.opt.PoolSize {
+ p.poolSize++
+ } else {
+ cn.pooled = false
+ }
+ }
p.connsMu.Unlock()
return cn, nil
}
-func (p *ConnPool) newConn() (*Conn, error) {
+func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
@@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) {
return nil, err
}
- return NewConn(netConn), nil
+ cn := NewConn(netConn)
+ cn.pooled = pooled
+ return cn, nil
}
func (p *ConnPool) tryDial() {
@@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) {
}
for {
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.popIdle()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn == nil {
break
}
- if cn.IsStale(p.opt.IdleTimeout) {
- p.CloseConn(cn)
+ if p.isStaleConn(cn) {
+ _ = p.CloseConn(cn)
continue
}
@@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) {
atomic.AddUint32(&p.stats.Misses, 1)
- newcn, err := p.NewConn()
+ newcn, err := p._NewConn(true)
if err != nil {
p.freeTurn()
return nil, err
@@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn {
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
-
+ p.idleConnsLen--
+ p.checkMinIdleConns()
return cn
}
func (p *ConnPool) Put(cn *Conn) {
- buf := cn.Rd.PeekBuffered()
- if buf != nil {
- internal.Logf("connection has unread data: %.100q", buf)
+ if !cn.pooled {
p.Remove(cn)
return
}
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
- p.idleConnsMu.Unlock()
+ p.idleConnsLen++
+ p.connsMu.Unlock()
p.freeTurn()
}
@@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) {
for i, c := range p.conns {
if c == cn {
p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ if cn.pooled {
+ p.poolSize--
+ p.checkMinIdleConns()
+ }
break
}
}
@@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error {
// Len returns total number of connections.
func (p *ConnPool) Len() int {
p.connsMu.Lock()
- l := len(p.conns)
+ n := len(p.conns)
p.connsMu.Unlock()
- return l
+ return n
}
-// FreeLen returns number of idle connections.
+// IdleLen returns number of idle connections.
func (p *ConnPool) IdleLen() int {
- p.idleConnsMu.RLock()
- l := len(p.idleConns)
- p.idleConnsMu.RUnlock()
- return l
+ p.connsMu.Lock()
+ n := p.idleConnsLen
+ p.connsMu.Unlock()
+ return n
}
func (p *ConnPool) Stats() *Stats {
@@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats {
Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
TotalConns: uint32(p.Len()),
- FreeConns: uint32(idleLen),
IdleConns: uint32(idleLen),
StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
}
@@ -349,11 +393,10 @@ func (p *ConnPool) Close() error {
}
}
p.conns = nil
- p.connsMu.Unlock()
-
- p.idleConnsMu.Lock()
+ p.poolSize = 0
p.idleConns = nil
- p.idleConnsMu.Unlock()
+ p.idleConnsLen = 0
+ p.connsMu.Unlock()
return firstErr
}
@@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn {
}
cn := p.idleConns[0]
- if !cn.IsStale(p.opt.IdleTimeout) {
+ if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+ p.idleConnsLen--
return cn
}
@@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
for {
p.getTurn()
- p.idleConnsMu.Lock()
+ p.connsMu.Lock()
cn := p.reapStaleConn()
- p.idleConnsMu.Unlock()
+ p.connsMu.Unlock()
if cn != nil {
p.removeConn(cn)
@@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) {
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+ if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+ return false
+ }
+
+ now := time.Now()
+ if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+ return true
+ }
+ if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge {
+ return true
+ }
+
+ return false
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
index 8c28c7b7..896b6f65 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go
@@ -9,8 +9,6 @@ import (
"github.com/go-redis/redis/internal/util"
)
-const bytesAllocLimit = 1024 * 1024 // 1mb
-
const (
ErrorReply = '-'
StatusReply = '+'
@@ -32,40 +30,23 @@ func (e RedisError) Error() string { return string(e) }
type MultiBulkParse func(*Reader, int64) (interface{}, error)
type Reader struct {
- src *bufio.Reader
- buf []byte
+ rd *bufio.Reader
+ _buf []byte
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
- src: bufio.NewReader(rd),
- buf: make([]byte, 4096),
+ rd: bufio.NewReader(rd),
+ _buf: make([]byte, 64),
}
}
func (r *Reader) Reset(rd io.Reader) {
- r.src.Reset(rd)
-}
-
-func (r *Reader) PeekBuffered() []byte {
- if n := r.src.Buffered(); n != 0 {
- b, _ := r.src.Peek(n)
- return b
- }
- return nil
-}
-
-func (r *Reader) ReadN(n int) ([]byte, error) {
- b, err := readN(r.src, r.buf, n)
- if err != nil {
- return nil, err
- }
- r.buf = b
- return b, nil
+ r.rd.Reset(rd)
}
func (r *Reader) ReadLine() ([]byte, error) {
- line, isPrefix, err := r.src.ReadLine()
+ line, isPrefix, err := r.rd.ReadLine()
if err != nil {
return nil, err
}
@@ -91,11 +72,11 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
case ErrorReply:
return nil, ParseErrorReply(line)
case StatusReply:
- return parseTmpStatusReply(line), nil
+ return string(line[1:]), nil
case IntReply:
return util.ParseInt(line[1:], 10, 64)
case StringReply:
- return r.readTmpBytesReply(line)
+ return r.readStringReply(line)
case ArrayReply:
n, err := parseArrayLen(line)
if err != nil {
@@ -121,47 +102,42 @@ func (r *Reader) ReadIntReply() (int64, error) {
}
}
-func (r *Reader) ReadTmpBytesReply() ([]byte, error) {
+func (r *Reader) ReadString() (string, error) {
line, err := r.ReadLine()
if err != nil {
- return nil, err
+ return "", err
}
switch line[0] {
case ErrorReply:
- return nil, ParseErrorReply(line)
+ return "", ParseErrorReply(line)
case StringReply:
- return r.readTmpBytesReply(line)
+ return r.readStringReply(line)
case StatusReply:
- return parseTmpStatusReply(line), nil
+ return string(line[1:]), nil
+ case IntReply:
+ return string(line[1:]), nil
default:
- return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
+ return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
}
}
-func (r *Reader) ReadBytesReply() ([]byte, error) {
- b, err := r.ReadTmpBytesReply()
- if err != nil {
- return nil, err
+func (r *Reader) readStringReply(line []byte) (string, error) {
+ if isNilReply(line) {
+ return "", Nil
}
- cp := make([]byte, len(b))
- copy(cp, b)
- return cp, nil
-}
-func (r *Reader) ReadStringReply() (string, error) {
- b, err := r.ReadTmpBytesReply()
+ replyLen, err := strconv.Atoi(string(line[1:]))
if err != nil {
return "", err
}
- return string(b), nil
-}
-func (r *Reader) ReadFloatReply() (float64, error) {
- b, err := r.ReadTmpBytesReply()
+ b := make([]byte, replyLen+2)
+ _, err = io.ReadFull(r.rd, b)
if err != nil {
- return 0, err
+ return "", err
}
- return util.ParseFloat(b, 64)
+
+ return util.BytesToString(b[:replyLen]), nil
}
func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
@@ -219,7 +195,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
keys := make([]string, n)
for i := int64(0); i < n; i++ {
- key, err := r.ReadStringReply()
+ key, err := r.ReadString()
if err != nil {
return nil, 0, err
}
@@ -229,69 +205,71 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) {
return keys, cursor, err
}
-func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) {
- if isNilReply(line) {
- return nil, Nil
- }
-
- replyLen, err := strconv.Atoi(string(line[1:]))
+func (r *Reader) ReadInt() (int64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
- return nil, err
+ return 0, err
}
+ return util.ParseInt(b, 10, 64)
+}
- b, err := r.ReadN(replyLen + 2)
+func (r *Reader) ReadUint() (uint64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
- return nil, err
+ return 0, err
}
- return b[:replyLen], nil
+ return util.ParseUint(b, 10, 64)
}
-func (r *Reader) ReadInt() (int64, error) {
- b, err := r.ReadTmpBytesReply()
+func (r *Reader) ReadFloatReply() (float64, error) {
+ b, err := r.readTmpBytesReply()
if err != nil {
return 0, err
}
- return util.ParseInt(b, 10, 64)
+ return util.ParseFloat(b, 64)
}
-func (r *Reader) ReadUint() (uint64, error) {
- b, err := r.ReadTmpBytesReply()
+func (r *Reader) readTmpBytesReply() ([]byte, error) {
+ line, err := r.ReadLine()
if err != nil {
- return 0, err
+ return nil, err
+ }
+ switch line[0] {
+ case ErrorReply:
+ return nil, ParseErrorReply(line)
+ case StringReply:
+ return r._readTmpBytesReply(line)
+ case StatusReply:
+ return line[1:], nil
+ default:
+ return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
}
- return util.ParseUint(b, 10, 64)
}
-// --------------------------------------------------------------------
+func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
+ if isNilReply(line) {
+ return nil, Nil
+ }
-func readN(r io.Reader, b []byte, n int) ([]byte, error) {
- if n == 0 && b == nil {
- return make([]byte, 0), nil
+ replyLen, err := strconv.Atoi(string(line[1:]))
+ if err != nil {
+ return nil, err
}
- if cap(b) >= n {
- b = b[:n]
- _, err := io.ReadFull(r, b)
- return b, err
+ buf := r.buf(replyLen + 2)
+ _, err = io.ReadFull(r.rd, buf)
+ if err != nil {
+ return nil, err
}
- b = b[:cap(b)]
- pos := 0
- for pos < n {
- diff := n - len(b)
- if diff > bytesAllocLimit {
- diff = bytesAllocLimit
- }
- b = append(b, make([]byte, diff)...)
+ return buf[:replyLen], nil
+}
- nn, err := io.ReadFull(r, b[pos:])
- if err != nil {
- return nil, err
- }
- pos += nn
+func (r *Reader) buf(n int) []byte {
+ if d := n - cap(r._buf); d > 0 {
+ r._buf = append(r._buf, make([]byte, d)...)
}
-
- return b, nil
+ return r._buf[:n]
}
func isNilReply(b []byte) bool {
@@ -304,10 +282,6 @@ func ParseErrorReply(line []byte) error {
return RedisError(string(line[1:]))
}
-func parseTmpStatusReply(line []byte) []byte {
- return line[1:]
-}
-
func parseArrayLen(line []byte) (int64, error) {
if isNilReply(line) {
return 0, Nil
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
deleted file mode 100644
index 664f4c33..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package proto
-
-import (
- "encoding"
- "fmt"
- "strconv"
-)
-
-type WriteBuffer struct {
- b []byte
-}
-
-func NewWriteBuffer() *WriteBuffer {
- return &WriteBuffer{
- b: make([]byte, 0, 4096),
- }
-}
-
-func (w *WriteBuffer) Len() int { return len(w.b) }
-func (w *WriteBuffer) Bytes() []byte { return w.b }
-func (w *WriteBuffer) Reset() { w.b = w.b[:0] }
-
-func (w *WriteBuffer) Append(args []interface{}) error {
- w.b = append(w.b, ArrayReply)
- w.b = strconv.AppendUint(w.b, uint64(len(args)), 10)
- w.b = append(w.b, '\r', '\n')
-
- for _, arg := range args {
- if err := w.append(arg); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (w *WriteBuffer) append(val interface{}) error {
- switch v := val.(type) {
- case nil:
- w.AppendString("")
- case string:
- w.AppendString(v)
- case []byte:
- w.AppendBytes(v)
- case int:
- w.AppendString(formatInt(int64(v)))
- case int8:
- w.AppendString(formatInt(int64(v)))
- case int16:
- w.AppendString(formatInt(int64(v)))
- case int32:
- w.AppendString(formatInt(int64(v)))
- case int64:
- w.AppendString(formatInt(v))
- case uint:
- w.AppendString(formatUint(uint64(v)))
- case uint8:
- w.AppendString(formatUint(uint64(v)))
- case uint16:
- w.AppendString(formatUint(uint64(v)))
- case uint32:
- w.AppendString(formatUint(uint64(v)))
- case uint64:
- w.AppendString(formatUint(v))
- case float32:
- w.AppendString(formatFloat(float64(v)))
- case float64:
- w.AppendString(formatFloat(v))
- case bool:
- if v {
- w.AppendString("1")
- } else {
- w.AppendString("0")
- }
- case encoding.BinaryMarshaler:
- b, err := v.MarshalBinary()
- if err != nil {
- return err
- }
- w.AppendBytes(b)
- default:
- return fmt.Errorf(
- "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val)
- }
- return nil
-}
-
-func (w *WriteBuffer) AppendString(s string) {
- w.b = append(w.b, StringReply)
- w.b = strconv.AppendUint(w.b, uint64(len(s)), 10)
- w.b = append(w.b, '\r', '\n')
- w.b = append(w.b, s...)
- w.b = append(w.b, '\r', '\n')
-}
-
-func (w *WriteBuffer) AppendBytes(p []byte) {
- w.b = append(w.b, StringReply)
- w.b = strconv.AppendUint(w.b, uint64(len(p)), 10)
- w.b = append(w.b, '\r', '\n')
- w.b = append(w.b, p...)
- w.b = append(w.b, '\r', '\n')
-}
-
-func formatInt(n int64) string {
- return strconv.FormatInt(n, 10)
-}
-
-func formatUint(u uint64) string {
- return strconv.FormatUint(u, 10)
-}
-
-func formatFloat(f float64) string {
- return strconv.FormatFloat(f, 'f', -1, 64)
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go
new file mode 100644
index 00000000..d106ce0e
--- /dev/null
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go
@@ -0,0 +1,159 @@
+package proto
+
+import (
+ "bufio"
+ "encoding"
+ "fmt"
+ "io"
+ "strconv"
+
+ "github.com/go-redis/redis/internal/util"
+)
+
+type Writer struct {
+ wr *bufio.Writer
+
+ lenBuf []byte
+ numBuf []byte
+}
+
+func NewWriter(wr io.Writer) *Writer {
+ return &Writer{
+ wr: bufio.NewWriter(wr),
+
+ lenBuf: make([]byte, 64),
+ numBuf: make([]byte, 64),
+ }
+}
+
+func (w *Writer) WriteArgs(args []interface{}) error {
+ err := w.wr.WriteByte(ArrayReply)
+ if err != nil {
+ return err
+ }
+
+ err = w.writeLen(len(args))
+ if err != nil {
+ return err
+ }
+
+ for _, arg := range args {
+ err := w.writeArg(arg)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (w *Writer) writeLen(n int) error {
+ w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10)
+ w.lenBuf = append(w.lenBuf, '\r', '\n')
+ _, err := w.wr.Write(w.lenBuf)
+ return err
+}
+
+func (w *Writer) writeArg(v interface{}) error {
+ switch v := v.(type) {
+ case nil:
+ return w.string("")
+ case string:
+ return w.string(v)
+ case []byte:
+ return w.bytes(v)
+ case int:
+ return w.int(int64(v))
+ case int8:
+ return w.int(int64(v))
+ case int16:
+ return w.int(int64(v))
+ case int32:
+ return w.int(int64(v))
+ case int64:
+ return w.int(v)
+ case uint:
+ return w.uint(uint64(v))
+ case uint8:
+ return w.uint(uint64(v))
+ case uint16:
+ return w.uint(uint64(v))
+ case uint32:
+ return w.uint(uint64(v))
+ case uint64:
+ return w.uint(v)
+ case float32:
+ return w.float(float64(v))
+ case float64:
+ return w.float(v)
+ case bool:
+ if v {
+ return w.int(1)
+ } else {
+ return w.int(0)
+ }
+ case encoding.BinaryMarshaler:
+ b, err := v.MarshalBinary()
+ if err != nil {
+ return err
+ }
+ return w.bytes(b)
+ default:
+ return fmt.Errorf(
+ "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v)
+ }
+}
+
+func (w *Writer) bytes(b []byte) error {
+ err := w.wr.WriteByte(StringReply)
+ if err != nil {
+ return err
+ }
+
+ err = w.writeLen(len(b))
+ if err != nil {
+ return err
+ }
+
+ _, err = w.wr.Write(b)
+ if err != nil {
+ return err
+ }
+
+ return w.crlf()
+}
+
+func (w *Writer) string(s string) error {
+ return w.bytes(util.StringToBytes(s))
+}
+
+func (w *Writer) uint(n uint64) error {
+ w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) int(n int64) error {
+ w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) float(f float64) error {
+ w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64)
+ return w.bytes(w.numBuf)
+}
+
+func (w *Writer) crlf() error {
+ err := w.wr.WriteByte('\r')
+ if err != nil {
+ return err
+ }
+ return w.wr.WriteByte('\n')
+}
+
+func (w *Writer) Reset(wr io.Writer) {
+ w.wr.Reset(wr)
+}
+
+func (w *Writer) Flush() error {
+ return w.wr.Flush()
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
deleted file mode 100644
index 3b174172..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Copyright 2013 Google Inc.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package singleflight provides a duplicate function call suppression
-// mechanism.
-package singleflight
-
-import "sync"
-
-// call is an in-flight or completed Do call
-type call struct {
- wg sync.WaitGroup
- val interface{}
- err error
-}
-
-// Group represents a class of work and forms a namespace in which
-// units of work can be executed with duplicate suppression.
-type Group struct {
- mu sync.Mutex // protects m
- m map[string]*call // lazily initialized
-}
-
-// Do executes and returns the results of the given function, making
-// sure that only one execution is in-flight for a given key at a
-// time. If a duplicate comes in, the duplicate caller waits for the
-// original to complete and receives the same results.
-func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
- g.mu.Lock()
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- if c, ok := g.m[key]; ok {
- g.mu.Unlock()
- c.wg.Wait()
- return c.val, c.err
- }
- c := new(call)
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
-
- c.val, c.err = fn()
- c.wg.Done()
-
- g.mu.Lock()
- delete(g.m, key)
- g.mu.Unlock()
-
- return c.val, c.err
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
index cd891833..1b3060eb 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go
@@ -5,3 +5,7 @@ package util
func BytesToString(b []byte) string {
return string(b)
}
+
+func StringToBytes(s string) []byte {
+ return []byte(s)
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
index 93a89c55..c9868aac 100644
--- a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
+++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go
@@ -10,3 +10,13 @@ import (
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
+
+// StringToBytes converts string to byte slice.
+func StringToBytes(s string) []byte {
+ return *(*[]byte)(unsafe.Pointer(
+ &struct {
+ string
+ Cap int
+ }{s, len(s)},
+ ))
+}
diff --git a/src/dma/vendor/github.com/go-redis/redis/options.go b/src/dma/vendor/github.com/go-redis/redis/options.go
index 8a82d590..b6fabf3f 100644
--- a/src/dma/vendor/github.com/go-redis/redis/options.go
+++ b/src/dma/vendor/github.com/go-redis/redis/options.go
@@ -14,6 +14,17 @@ import (
"github.com/go-redis/redis/internal/pool"
)
+// Limiter is the interface of a rate limiter or a circuit breaker.
+type Limiter interface {
+ // Allow returns a nil if operation is allowed or an error otherwise.
+ // If operation is allowed client must report the result of operation
+ // whether is a success or a failure.
+ Allow() error
+ // ReportResult reports the result of previously allowed operation.
+ // nil indicates a success, non-nil error indicates a failure.
+ ReportResult(result error)
+}
+
type Options struct {
// The network type, either tcp or unix.
// Default is tcp.
@@ -48,7 +59,7 @@ type Options struct {
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads. If reached, commands will fail
- // with a timeout instead of blocking.
+ // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes. If reached, commands will fail
@@ -59,6 +70,12 @@ type Options struct {
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
+ // Minimum number of idle connections which is useful when establishing
+ // new connection is slow.
+ MinIdleConns int
+ // Connection age at which client retires (closes) the connection.
+ // Default is to not close aged connections.
+ MaxConnAge time.Duration
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
@@ -69,7 +86,8 @@ type Options struct {
IdleTimeout time.Duration
// Frequency of idle checks made by idle connections reaper.
// Default is 1 minute. -1 disables idle connections reaper,
- // but idle connections are still discarded by the client.
+ // but idle connections are still discarded by the client
+ // if IdleTimeout is set.
IdleCheckFrequency time.Duration
// Enables read only queries on slave nodes.
@@ -83,6 +101,9 @@ func (opt *Options) init() {
if opt.Network == "" {
opt.Network = "tcp"
}
+ if opt.Addr == "" {
+ opt.Addr = "localhost:6379"
+ }
if opt.Dialer == nil {
opt.Dialer = func() (net.Conn, error) {
netDialer := &net.Dialer{
@@ -196,6 +217,8 @@ func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: opt.Dialer,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
diff --git a/src/dma/vendor/github.com/go-redis/redis/parser.go b/src/dma/vendor/github.com/go-redis/redis/parser.go
deleted file mode 100644
index f0dc67f0..00000000
--- a/src/dma/vendor/github.com/go-redis/redis/parser.go
+++ /dev/null
@@ -1,394 +0,0 @@
-package redis
-
-import (
- "fmt"
- "net"
- "strconv"
- "time"
-
- "github.com/go-redis/redis/internal/proto"
-)
-
-// Implements proto.MultiBulkParse
-func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- vals := make([]interface{}, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(sliceParser)
- if err != nil {
- if err == Nil {
- vals = append(vals, nil)
- continue
- }
- if err, ok := err.(proto.RedisError); ok {
- vals = append(vals, err)
- continue
- }
- return nil, err
- }
-
- switch v := v.(type) {
- case []byte:
- vals = append(vals, string(v))
- default:
- vals = append(vals, v)
- }
- }
- return vals, nil
-}
-
-// Implements proto.MultiBulkParse
-func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- bools := make([]bool, 0, n)
- for i := int64(0); i < n; i++ {
- n, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- bools = append(bools, n == 1)
- }
- return bools, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- ss := make([]string, 0, n)
- for i := int64(0); i < n; i++ {
- s, err := rd.ReadStringReply()
- if err == Nil {
- ss = append(ss, "")
- } else if err != nil {
- return nil, err
- } else {
- ss = append(ss, s)
- }
- }
- return ss, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]string, n/2)
- for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- value, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = value
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]int64, n/2)
- for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- n, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = n
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]struct{}, n)
- for i := int64(0); i < n; i++ {
- key, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- m[key] = struct{}{}
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- zz := make([]Z, n/2)
- for i := int64(0); i < n; i += 2 {
- var err error
-
- z := &zz[i/2]
-
- z.Member, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- z.Score, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
- return zz, nil
-}
-
-// Implements proto.MultiBulkParse
-func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
- slots := make([]ClusterSlot, n)
- for i := 0; i < len(slots); i++ {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n < 2 {
- err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
- return nil, err
- }
-
- start, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- end, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
-
- nodes := make([]ClusterNode, n-2)
- for j := 0; j < len(nodes); j++ {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n != 2 && n != 3 {
- err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
- return nil, err
- }
-
- ip, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- port, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
-
- if n == 3 {
- id, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
- nodes[j].Id = id
- }
- }
-
- slots[i] = ClusterSlot{
- Start: int(start),
- End: int(end),
- Nodes: nodes,
- }
- }
- return slots, nil
-}
-
-func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
- return func(rd *proto.Reader, n int64) (interface{}, error) {
- var loc GeoLocation
- var err error
-
- loc.Name, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
- if q.WithDist {
- loc.Dist, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
- if q.WithGeoHash {
- loc.GeoHash, err = rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- }
- if q.WithCoord {
- n, err := rd.ReadArrayLen()
- if err != nil {
- return nil, err
- }
- if n != 2 {
- return nil, fmt.Errorf("got %d coordinates, expected 2", n)
- }
-
- loc.Longitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- loc.Latitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
- }
-
- return &loc, nil
- }
-}
-
-func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
- return func(rd *proto.Reader, n int64) (interface{}, error) {
- locs := make([]GeoLocation, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(newGeoLocationParser(q))
- if err != nil {
- return nil, err
- }
- switch vv := v.(type) {
- case []byte:
- locs = append(locs, GeoLocation{
- Name: string(vv),
- })
- case *GeoLocation:
- locs = append(locs, *vv)
- default:
- return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
- }
- }
- return locs, nil
- }
-}
-
-func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
- var pos GeoPos
- var err error
-
- pos.Longitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
-
- pos.Latitude, err = rd.ReadFloatReply()
- if err != nil {
- return nil, err
- }
-
- return &pos, nil
-}
-
-func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- positions := make([]*GeoPos, 0, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(geoPosParser)
- if err != nil {
- if err == Nil {
- positions = append(positions, nil)
- continue
- }
- return nil, err
- }
- switch v := v.(type) {
- case *GeoPos:
- positions = append(positions, v)
- default:
- return nil, fmt.Errorf("got %T, expected *GeoPos", v)
- }
- }
- return positions, nil
-}
-
-func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
- var cmd CommandInfo
- var err error
-
- if n != 6 {
- return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
- }
-
- cmd.Name, err = rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
-
- arity, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.Arity = int8(arity)
-
- flags, err := rd.ReadReply(stringSliceParser)
- if err != nil {
- return nil, err
- }
- cmd.Flags = flags.([]string)
-
- firstKeyPos, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.FirstKeyPos = int8(firstKeyPos)
-
- lastKeyPos, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.LastKeyPos = int8(lastKeyPos)
-
- stepCount, err := rd.ReadIntReply()
- if err != nil {
- return nil, err
- }
- cmd.StepCount = int8(stepCount)
-
- for _, flag := range cmd.Flags {
- if flag == "readonly" {
- cmd.ReadOnly = true
- break
- }
- }
-
- return &cmd, nil
-}
-
-// Implements proto.MultiBulkParse
-func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- m := make(map[string]*CommandInfo, n)
- for i := int64(0); i < n; i++ {
- v, err := rd.ReadReply(commandInfoParser)
- if err != nil {
- return nil, err
- }
- vv := v.(*CommandInfo)
- m[vv.Name] = vv
-
- }
- return m, nil
-}
-
-// Implements proto.MultiBulkParse
-func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
- if n != 2 {
- return nil, fmt.Errorf("got %d elements, expected 2", n)
- }
-
- sec, err := rd.ReadInt()
- if err != nil {
- return nil, err
- }
-
- microsec, err := rd.ReadInt()
- if err != nil {
- return nil, err
- }
-
- return time.Unix(sec, microsec*1000), nil
-}
diff --git a/src/dma/vendor/github.com/go-redis/redis/pipeline.go b/src/dma/vendor/github.com/go-redis/redis/pipeline.go
index ba852283..b3a8844a 100644
--- a/src/dma/vendor/github.com/go-redis/redis/pipeline.go
+++ b/src/dma/vendor/github.com/go-redis/redis/pipeline.go
@@ -10,6 +10,7 @@ type pipelineExecer func([]Cmder) error
type Pipeliner interface {
StatefulCmdable
+ Do(args ...interface{}) *Cmd
Process(cmd Cmder) error
Close() error
Discard() error
@@ -31,6 +32,12 @@ type Pipeline struct {
closed bool
}
+func (c *Pipeline) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ _ = c.Process(cmd)
+ return cmd
+}
+
// Process queues the cmd for later execution.
func (c *Pipeline) Process(cmd Cmder) error {
c.mu.Lock()
diff --git a/src/dma/vendor/github.com/go-redis/redis/pubsub.go b/src/dma/vendor/github.com/go-redis/redis/pubsub.go
index 2cfcd150..0afb47cd 100644
--- a/src/dma/vendor/github.com/go-redis/redis/pubsub.go
+++ b/src/dma/vendor/github.com/go-redis/redis/pubsub.go
@@ -1,15 +1,19 @@
package redis
import (
+ "errors"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/pool"
+ "github.com/go-redis/redis/internal/proto"
)
-// PubSub implements Pub/Sub commands as described in
+var errPingTimeout = errors.New("redis: ping timeout")
+
+// PubSub implements Pub/Sub commands bas described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
//
@@ -46,15 +50,17 @@ func (c *PubSub) conn() (*pool.Conn, error) {
return cn, err
}
-func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
+func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
-
if c.cn != nil {
return c.cn, nil
}
+ channels := mapKeys(c.channels)
+ channels = append(channels, newChannels...)
+
cn, err := c.newConn(channels)
if err != nil {
return nil, err
@@ -69,20 +75,24 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
return cn, nil
}
+func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
+ return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+}
+
func (c *PubSub) resubscribe(cn *pool.Conn) error {
var firstErr error
if len(c.channels) > 0 {
- channels := mapKeys(c.channels)
- err := c._subscribe(cn, "subscribe", channels...)
+ err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
if err != nil && firstErr == nil {
firstErr = err
}
}
if len(c.patterns) > 0 {
- patterns := mapKeys(c.patterns)
- err := c._subscribe(cn, "psubscribe", patterns...)
+ err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
if err != nil && firstErr == nil {
firstErr = err
}
@@ -101,51 +111,48 @@ func mapKeys(m map[string]struct{}) []string {
return s
}
-func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
- args := make([]interface{}, 1+len(channels))
- args[0] = redisCmd
- for i, channel := range channels {
- args[1+i] = channel
+func (c *PubSub) _subscribe(
+ cn *pool.Conn, redisCmd string, channels []string,
+) error {
+ args := make([]interface{}, 0, 1+len(channels))
+ args = append(args, redisCmd)
+ for _, channel := range channels {
+ args = append(args, channel)
}
cmd := NewSliceCmd(args...)
-
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- return writeCmd(cn, cmd)
+ return c.writeCmd(cn, cmd)
}
-func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
c.mu.Lock()
- c._releaseConn(cn, err)
+ c._releaseConn(cn, err, allowTimeout)
c.mu.Unlock()
}
-func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
+func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
if c.cn != cn {
return
}
- if internal.IsBadConn(err, true) {
- c._reconnect()
+ if internal.IsBadConn(err, allowTimeout) {
+ c._reconnect(err)
}
}
-func (c *PubSub) _closeTheCn() error {
- var err error
- if c.cn != nil {
- err = c.closeConn(c.cn)
- c.cn = nil
- }
- return err
-}
-
-func (c *PubSub) reconnect() {
- c.mu.Lock()
- c._reconnect()
- c.mu.Unlock()
+func (c *PubSub) _reconnect(reason error) {
+ _ = c._closeTheCn(reason)
+ _, _ = c._conn(nil)
}
-func (c *PubSub) _reconnect() {
- _ = c._closeTheCn()
- _, _ = c._conn(nil)
+func (c *PubSub) _closeTheCn(reason error) error {
+ if c.cn == nil {
+ return nil
+ }
+ if !c.closed {
+ internal.Logf("redis: discarding bad PubSub connection: %s", reason)
+ }
+ err := c.closeConn(c.cn)
+ c.cn = nil
+ return err
}
func (c *PubSub) Close() error {
@@ -158,7 +165,7 @@ func (c *PubSub) Close() error {
c.closed = true
close(c.exit)
- err := c._closeTheCn()
+ err := c._closeTheCn(pool.ErrClosed)
return err
}
@@ -172,8 +179,8 @@ func (c *PubSub) Subscribe(channels ...string) error {
if c.channels == nil {
c.channels = make(map[string]struct{})
}
- for _, channel := range channels {
- c.channels[channel] = struct{}{}
+ for _, s := range channels {
+ c.channels[s] = struct{}{}
}
return err
}
@@ -188,8 +195,8 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
if c.patterns == nil {
c.patterns = make(map[string]struct{})
}
- for _, pattern := range patterns {
- c.patterns[pattern] = struct{}{}
+ for _, s := range patterns {
+ c.patterns[s] = struct{}{}
}
return err
}
@@ -200,10 +207,10 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
- err := c.subscribe("unsubscribe", channels...)
for _, channel := range channels {
delete(c.channels, channel)
}
+ err := c.subscribe("unsubscribe", channels...)
return err
}
@@ -213,10 +220,10 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
- err := c.subscribe("punsubscribe", patterns...)
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
+ err := c.subscribe("punsubscribe", patterns...)
return err
}
@@ -226,8 +233,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
return err
}
- err = c._subscribe(cn, redisCmd, channels...)
- c._releaseConn(cn, err)
+ err = c._subscribe(cn, redisCmd, channels)
+ c._releaseConn(cn, err, false)
return err
}
@@ -243,9 +250,8 @@ func (c *PubSub) Ping(payload ...string) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- err = writeCmd(cn, cmd)
- c.releaseConn(cn, err)
+ err = c.writeCmd(cn, cmd)
+ c.releaseConn(cn, err, false)
return err
}
@@ -336,9 +342,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
return nil, err
}
- cn.SetReadTimeout(timeout)
- err = c.cmd.readReply(cn)
- c.releaseConn(cn, err)
+ err = cn.WithReader(timeout, func(rd *proto.Reader) error {
+ return c.cmd.readReply(rd)
+ })
+
+ c.releaseConn(cn, err, timeout > 0)
if err != nil {
return nil, err
}
@@ -432,21 +440,26 @@ func (c *PubSub) initChannel() {
timer := time.NewTimer(timeout)
timer.Stop()
- var hasPing bool
+ healthy := true
for {
timer.Reset(timeout)
select {
case <-c.ping:
- hasPing = true
+ healthy = true
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
- if hasPing {
- hasPing = false
- _ = c.Ping()
+ pingErr := c.Ping()
+ if healthy {
+ healthy = false
} else {
- c.reconnect()
+ if pingErr == nil {
+ pingErr = errPingTimeout
+ }
+ c.mu.Lock()
+ c._reconnect(pingErr)
+ c.mu.Unlock()
}
case <-c.exit:
return
diff --git a/src/dma/vendor/github.com/go-redis/redis/redis.go b/src/dma/vendor/github.com/go-redis/redis/redis.go
index c0f142cc..aca30648 100644
--- a/src/dma/vendor/github.com/go-redis/redis/redis.go
+++ b/src/dma/vendor/github.com/go-redis/redis/redis.go
@@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) {
type baseClient struct {
opt *Options
connPool pool.Pooler
+ limiter Limiter
process func(Cmder) error
processPipeline func([]Cmder) error
@@ -50,7 +51,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
if err := c.initConn(cn); err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
@@ -61,12 +62,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) {
}
func (c *baseClient) getConn() (*pool.Conn, error) {
+ if c.limiter != nil {
+ err := c.limiter.Allow()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ cn, err := c._getConn()
+ if err != nil {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
+ }
+ return nil, err
+ }
+ return cn, nil
+}
+
+func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
}
- if !cn.Inited {
+ if cn.InitedAt.IsZero() {
err := c.initConn(cn)
if err != nil {
c.connPool.Remove(cn)
@@ -77,18 +96,32 @@ func (c *baseClient) getConn() (*pool.Conn, error) {
return cn, nil
}
-func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool {
+func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
+ }
+
if internal.IsBadConn(err, false) {
c.connPool.Remove(cn)
- return false
+ } else {
+ c.connPool.Put(cn)
+ }
+}
+
+func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
+ if c.limiter != nil {
+ c.limiter.ReportResult(err)
}
- c.connPool.Put(cn)
- return true
+ if err == nil || internal.IsRedisError(err) {
+ c.connPool.Put(cn)
+ } else {
+ c.connPool.Remove(cn)
+ }
}
func (c *baseClient) initConn(cn *pool.Conn) error {
- cn.Inited = true
+ cn.InitedAt = time.Now()
if c.opt.Password == "" &&
c.opt.DB == 0 &&
@@ -123,8 +156,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
return nil
}
+// Do creates a Cmd from the args and processes the cmd.
+func (c *baseClient) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ _ = c.Process(cmd)
+ return cmd
+}
+
// WrapProcess wraps function that processes Redis commands.
-func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
+func (c *baseClient) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
c.process = fn(c.process)
}
@@ -147,8 +189,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmd); err != nil {
+ err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmd)
+ })
+ if err != nil {
c.releaseConn(cn, err)
cmd.setErr(err)
if internal.IsRetryableError(err, true) {
@@ -157,8 +201,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error {
return err
}
- cn.SetReadTimeout(c.cmdTimeout(cmd))
- err = cmd.readReply(cn)
+ err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
+ return cmd.readReply(rd)
+ })
c.releaseConn(cn, err)
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
continue
@@ -176,7 +221,11 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration {
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
- return readTimeout(*timeout)
+ t := *timeout
+ if t == 0 {
+ return 0
+ }
+ return t + 10*time.Second
}
return c.opt.ReadTimeout
}
@@ -232,35 +281,33 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e
}
canRetry, err := p(cn, cmds)
-
- if err == nil || internal.IsRedisError(err) {
- c.connPool.Put(cn)
- break
- }
- c.connPool.Remove(cn)
+ c.releaseConnStrict(cn, err)
if !canRetry || !internal.IsRetryableError(err, true) {
break
}
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := writeCmd(cn, cmds...); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return writeCmd(wr, cmds...)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
- return true, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ return pipelineReadCmds(rd, cmds)
+ })
+ return true, err
}
-func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
+func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for _, cmd := range cmds {
- err := cmd.readReply(cn)
+ err := cmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
@@ -269,47 +316,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error {
}
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
- cn.SetWriteTimeout(c.opt.WriteTimeout)
- if err := txPipelineWriteMulti(cn, cmds); err != nil {
+ err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+ return txPipelineWriteMulti(wr, cmds)
+ })
+ if err != nil {
setCmdsErr(cmds, err)
return true, err
}
- // Set read timeout for all commands.
- cn.SetReadTimeout(c.opt.ReadTimeout)
-
- if err := c.txPipelineReadQueued(cn, cmds); err != nil {
- setCmdsErr(cmds, err)
- return false, err
- }
-
- return false, pipelineReadCmds(cn, cmds)
+ err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+ err := txPipelineReadQueued(rd, cmds)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return err
+ }
+ return pipelineReadCmds(rd, cmds)
+ })
+ return false, err
}
-func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
multiExec := make([]Cmder, 0, len(cmds)+2)
multiExec = append(multiExec, NewStatusCmd("MULTI"))
multiExec = append(multiExec, cmds...)
multiExec = append(multiExec, NewSliceCmd("EXEC"))
- return writeCmd(cn, multiExec...)
+ return writeCmd(wr, multiExec...)
}
-func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error {
+func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
// Parse queued replies.
var statusCmd StatusCmd
- if err := statusCmd.readReply(cn); err != nil {
+ err := statusCmd.readReply(rd)
+ if err != nil {
return err
}
- for _ = range cmds {
- err := statusCmd.readReply(cn)
+ for range cmds {
+ err = statusCmd.readReply(rd)
if err != nil && !internal.IsRedisError(err) {
return err
}
}
// Parse number of replies.
- line, err := cn.Rd.ReadLine()
+ line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
@@ -373,12 +423,12 @@ func (c *Client) WithContext(ctx context.Context) *Client {
if ctx == nil {
panic("nil context")
}
- c2 := c.copy()
+ c2 := c.clone()
c2.ctx = ctx
return c2
}
-func (c *Client) copy() *Client {
+func (c *Client) clone() *Client {
cp := *c
cp.init()
return &cp
@@ -389,6 +439,11 @@ func (c *Client) Options() *Options {
return c.opt
}
+func (c *Client) SetLimiter(l Limiter) *Client {
+ c.limiter = l
+ return c
+}
+
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
@@ -437,6 +492,30 @@ func (c *Client) pubSub() *PubSub {
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
+// Note that this method does not wait on a response from Redis, so the
+// subscription may not be active immediately. To force the connection to wait,
+// you may call the Receive() method on the returned *PubSub like so:
+//
+// sub := client.Subscribe(queryResp)
+// iface, err := sub.Receive()
+// if err != nil {
+// // handle error
+// }
+//
+// // Should be *Subscription, but others are possible if other actions have been
+// // taken on sub since it was created.
+// switch iface.(type) {
+// case *Subscription:
+// // subscribe succeeded
+// case *Message:
+// // received first message
+// case *Pong:
+// // pong received
+// default:
+// // handle error
+// }
+//
+// ch := sub.Channel()
func (c *Client) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
diff --git a/src/dma/vendor/github.com/go-redis/redis/result.go b/src/dma/vendor/github.com/go-redis/redis/result.go
index e086e8e3..e438f260 100644
--- a/src/dma/vendor/github.com/go-redis/redis/result.go
+++ b/src/dma/vendor/github.com/go-redis/redis/result.go
@@ -53,7 +53,7 @@ func NewBoolResult(val bool, err error) *BoolCmd {
// NewStringResult returns a StringCmd initialised with val and err for testing
func NewStringResult(val string, err error) *StringCmd {
var cmd StringCmd
- cmd.val = []byte(val)
+ cmd.val = val
cmd.setErr(err)
return &cmd
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go
index ef855115..250e5f64 100644
--- a/src/dma/vendor/github.com/go-redis/redis/ring.go
+++ b/src/dma/vendor/github.com/go-redis/redis/ring.go
@@ -68,6 +68,8 @@ type RingOptions struct {
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
@@ -315,12 +319,12 @@ func (c *ringShards) Close() error {
//------------------------------------------------------------------------------
-// Ring is a Redis client that uses constistent hashing to distribute
+// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
-// the ring. When shard comes online it is added back to the ring. This
+// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
@@ -338,6 +342,7 @@ type Ring struct {
shards *ringShards
cmdsInfoCache *cmdsInfoCache
+ process func(Cmder) error
processPipeline func([]Cmder) error
}
@@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+ ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
@@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats {
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
}
return &acc
}
@@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
return c.shards.GetByKey(firstKey)
}
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.ForEachShard(func(c *Client) error {
- c.WrapProcess(fn)
- return nil
- })
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
+func (c *Ring) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
+ c.process = fn(c.process)
}
func (c *Ring) Process(cmd Cmder) error {
- shard, err := c.cmdShard(cmd)
- if err != nil {
- cmd.setErr(err)
- return err
+ return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ shard, err := c.cmdShard(cmd)
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ err = shard.Client.Process(cmd)
+ if err == nil {
+ return nil
+ }
+ if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+ return err
+ }
}
- return shard.Client.Process(cmd)
+ return cmd.Err()
}
func (c *Ring) Pipeline() Pipeliner {
@@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
time.Sleep(c.retryBackoff(attempt))
}
+ var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
+ var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
- shard, err := c.shards.GetByHash(hash)
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ wg.Add(1)
+ go func(hash string, cmds []Cmder) {
+ defer wg.Done()
+
+ shard, err := c.shards.GetByHash(hash)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- cn, err := shard.Client.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ cn, err := shard.Client.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- shard.Client.connPool.Put(cn)
- continue
- }
- shard.Client.connPool.Remove(cn)
+ canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
+ shard.Client.releaseConnStrict(cn, err)
- if canRetry && internal.IsRetryableError(err, true) {
- if failedCmdsMap == nil {
- failedCmdsMap = make(map[string][]Cmder)
+ if canRetry && internal.IsRetryableError(err, true) {
+ mu.Lock()
+ if failedCmdsMap == nil {
+ failedCmdsMap = make(map[string][]Cmder)
+ }
+ failedCmdsMap[hash] = cmds
+ mu.Unlock()
}
- failedCmdsMap[hash] = cmds
- }
+ }(hash, cmds)
}
+ wg.Wait()
if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *Ring) TxPipeline() Pipeliner {
diff --git a/src/dma/vendor/github.com/go-redis/redis/sentinel.go b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
index 12c29a71..7cbb90bd 100644
--- a/src/dma/vendor/github.com/go-redis/redis/sentinel.go
+++ b/src/dma/vendor/github.com/go-redis/redis/sentinel.go
@@ -29,13 +29,17 @@ type FailoverOptions struct {
Password string
DB int
- MaxRetries int
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
@@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
},
}
c.baseClient.init()
- c.setProcessor(c.Process)
+ c.cmdable.setProcessor(c.Process)
return &c
}
@@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c
}
-func (c *SentinelClient) PubSub() *PubSub {
+func (c *SentinelClient) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
@@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub {
return pubsub
}
+// Subscribe subscribes the client to the specified channels.
+// Channels can be omitted to create empty subscription.
+func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.Subscribe(channels...)
+ }
+ return pubsub
+}
+
+// PSubscribe subscribes the client to the given patterns.
+// Patterns can be omitted to create empty subscription.
+func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
+ pubsub := c.pubSub()
+ if len(channels) > 0 {
+ _ = pubsub.PSubscribe(channels...)
+ }
+ return pubsub
+}
+
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
- cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
+ cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
c.Process(cmd)
return cmd
}
func (c *SentinelClient) Sentinels(name string) *SliceCmd {
- cmd := NewSliceCmd("SENTINEL", "sentinels", name)
+ cmd := NewSliceCmd("sentinel", "sentinels", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Failover forces a failover as if the master was not reachable, and without
+// asking for agreement to other Sentinels.
+func (c *SentinelClient) Failover(name string) *StatusCmd {
+ cmd := NewStatusCmd("sentinel", "failover", name)
+ c.Process(cmd)
+ return cmd
+}
+
+// Reset resets all the masters with matching name. The pattern argument is a
+// glob-style pattern. The reset process clears any previous state in a master
+// (including a failover in progress), and removes every slave and sentinel
+// already discovered and associated with the master.
+func (c *SentinelClient) Reset(pattern string) *IntCmd {
+ cmd := NewIntCmd("sentinel", "reset", pattern)
c.Process(cmd)
return cmd
}
@@ -152,79 +194,81 @@ type sentinelFailover struct {
masterName string
_masterAddr string
sentinel *SentinelClient
+ pubsub *PubSub
}
-func (d *sentinelFailover) Close() error {
- return d.resetSentinel()
+func (c *sentinelFailover) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.sentinel != nil {
+ return c.closeSentinel()
+ }
+ return nil
}
-func (d *sentinelFailover) Pool() *pool.ConnPool {
- d.poolOnce.Do(func() {
- d.opt.Dialer = d.dial
- d.pool = newConnPool(d.opt)
+func (c *sentinelFailover) Pool() *pool.ConnPool {
+ c.poolOnce.Do(func() {
+ c.opt.Dialer = c.dial
+ c.pool = newConnPool(c.opt)
})
- return d.pool
+ return c.pool
}
-func (d *sentinelFailover) dial() (net.Conn, error) {
- addr, err := d.MasterAddr()
+func (c *sentinelFailover) dial() (net.Conn, error) {
+ addr, err := c.MasterAddr()
if err != nil {
return nil, err
}
- return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
+ return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
}
-func (d *sentinelFailover) MasterAddr() (string, error) {
- d.mu.Lock()
- defer d.mu.Unlock()
-
- addr, err := d.masterAddr()
+func (c *sentinelFailover) MasterAddr() (string, error) {
+ addr, err := c.masterAddr()
if err != nil {
return "", err
}
- d._switchMaster(addr)
-
+ c.switchMaster(addr)
return addr, nil
}
-func (d *sentinelFailover) masterAddr() (string, error) {
- // Try last working sentinel.
- if d.sentinel != nil {
- addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
- if err == nil {
- addr := net.JoinHostPort(addr[0], addr[1])
- return addr, nil
- }
-
- internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
- d.masterName, err)
- d._resetSentinel()
+func (c *sentinelFailover) masterAddr() (string, error) {
+ addr := c.getMasterAddr()
+ if addr != "" {
+ return addr, nil
}
- for i, sentinelAddr := range d.sentinelAddrs {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(&Options{
Addr: sentinelAddr,
- DialTimeout: d.opt.DialTimeout,
- ReadTimeout: d.opt.ReadTimeout,
- WriteTimeout: d.opt.WriteTimeout,
+ MaxRetries: c.opt.MaxRetries,
+
+ DialTimeout: c.opt.DialTimeout,
+ ReadTimeout: c.opt.ReadTimeout,
+ WriteTimeout: c.opt.WriteTimeout,
- PoolSize: d.opt.PoolSize,
- PoolTimeout: d.opt.PoolTimeout,
- IdleTimeout: d.opt.IdleTimeout,
+ PoolSize: c.opt.PoolSize,
+ PoolTimeout: c.opt.PoolTimeout,
+ IdleTimeout: c.opt.IdleTimeout,
+ IdleCheckFrequency: c.opt.IdleCheckFrequency,
+
+ TLSConfig: c.opt.TLSConfig,
})
- masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
+ masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
if err != nil {
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
- d.masterName, err)
- sentinel.Close()
+ c.masterName, err)
+ _ = sentinel.Close()
continue
}
// Push working sentinel to the top.
- d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
- d.setSentinel(sentinel)
+ c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
+ c.setSentinel(sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
@@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) {
return "", errors.New("redis: all sentinels are unreachable")
}
-func (c *sentinelFailover) switchMaster(addr string) {
- c.mu.Lock()
- c._switchMaster(addr)
- c.mu.Unlock()
+func (c *sentinelFailover) getMasterAddr() string {
+ c.mu.RLock()
+ sentinel := c.sentinel
+ c.mu.RUnlock()
+
+ if sentinel == nil {
+ return ""
+ }
+
+ addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
+ if err != nil {
+ internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
+ c.masterName, err)
+ c.mu.Lock()
+ if c.sentinel == sentinel {
+ c.closeSentinel()
+ }
+ c.mu.Unlock()
+ return ""
+ }
+
+ return net.JoinHostPort(addr[0], addr[1])
}
-func (c *sentinelFailover) _switchMaster(addr string) {
- if c._masterAddr == addr {
+func (c *sentinelFailover) switchMaster(addr string) {
+ c.mu.RLock()
+ masterAddr := c._masterAddr
+ c.mu.RUnlock()
+ if masterAddr == addr {
return
}
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
internal.Logf("sentinel: new master=%q addr=%q",
c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
@@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) {
c._masterAddr = addr
}
-func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
- d.discoverSentinels(sentinel)
- d.sentinel = sentinel
- go d.listen(sentinel)
+func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
+ c.discoverSentinels(sentinel)
+ c.sentinel = sentinel
+
+ c.pubsub = sentinel.Subscribe("+switch-master")
+ go c.listen(c.pubsub)
}
-func (d *sentinelFailover) resetSentinel() error {
- var err error
- d.mu.Lock()
- if d.sentinel != nil {
- err = d._resetSentinel()
+func (c *sentinelFailover) closeSentinel() error {
+ var firstErr error
+
+ err := c.pubsub.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
}
- d.mu.Unlock()
- return err
-}
+ c.pubsub = nil
-func (d *sentinelFailover) _resetSentinel() error {
- err := d.sentinel.Close()
- d.sentinel = nil
- return err
+ err = c.sentinel.Close()
+ if err != nil && firstErr == err {
+ firstErr = err
+ }
+ c.sentinel = nil
+
+ return firstErr
}
-func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
- sentinels, err := sentinel.Sentinels(d.masterName).Result()
+func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
+ sentinels, err := sentinel.Sentinels(c.masterName).Result()
if err != nil {
- internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
+ internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
return
}
for _, sentinel := range sentinels {
@@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
key := vals[i].(string)
if key == "name" {
sentinelAddr := vals[i+1].(string)
- if !contains(d.sentinelAddrs, sentinelAddr) {
- internal.Logf(
- "sentinel: discovered new sentinel=%q for master=%q",
- sentinelAddr, d.masterName,
- )
- d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
+ if !contains(c.sentinelAddrs, sentinelAddr) {
+ internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
+ sentinelAddr, c.masterName)
+ c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
}
}
}
}
}
-func (d *sentinelFailover) listen(sentinel *SentinelClient) {
- pubsub := sentinel.PubSub()
- defer pubsub.Close()
-
- err := pubsub.Subscribe("+switch-master")
- if err != nil {
- internal.Logf("sentinel: Subscribe failed: %s", err)
- d.resetSentinel()
- return
- }
-
+func (c *sentinelFailover) listen(pubsub *PubSub) {
+ ch := pubsub.Channel()
for {
- msg, err := pubsub.ReceiveMessage()
- if err != nil {
- if err == pool.ErrClosed {
- d.resetSentinel()
- return
- }
- internal.Logf("sentinel: ReceiveMessage failed: %s", err)
- continue
+ msg, ok := <-ch
+ if !ok {
+ break
}
switch msg.Channel {
case "+switch-master":
parts := strings.Split(msg.Payload, " ")
- if parts[0] != d.masterName {
+ if parts[0] != c.masterName {
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
- d.switchMaster(addr)
+ c.switchMaster(addr)
}
}
}
diff --git a/src/dma/vendor/github.com/go-redis/redis/tx.go b/src/dma/vendor/github.com/go-redis/redis/tx.go
index 6a7da99d..fb3e6331 100644
--- a/src/dma/vendor/github.com/go-redis/redis/tx.go
+++ b/src/dma/vendor/github.com/go-redis/redis/tx.go
@@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx {
return &tx
}
-// Watch prepares a transcaction and marks the keys to be watched
+// Watch prepares a transaction and marks the keys to be watched
// for conditional execution if there are any keys.
//
-// The transaction is automatically closed when the fn exits.
+// The transaction is automatically closed when fn exits.
func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
tx := c.newTx()
if len(keys) > 0 {
diff --git a/src/dma/vendor/github.com/go-redis/redis/universal.go b/src/dma/vendor/github.com/go-redis/redis/universal.go
index 9e30c81d..a6075624 100644
--- a/src/dma/vendor/github.com/go-redis/redis/universal.go
+++ b/src/dma/vendor/github.com/go-redis/redis/universal.go
@@ -12,35 +12,38 @@ type UniversalOptions struct {
// of cluster/sentinel nodes.
Addrs []string
- // The sentinel master name.
- // Only failover clients.
- MasterName string
-
// Database to be selected after connecting to the server.
// Only single-node and failover clients.
DB int
- // Only cluster clients.
-
- // Enables read only queries on slave nodes.
- ReadOnly bool
-
- MaxRedirects int
- RouteByLatency bool
-
- // Common options
+ // Common options.
OnConnect func(*Conn) error
- MaxRetries int
Password string
+ MaxRetries int
+ MinRetryBackoff time.Duration
+ MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
+
+ // Only cluster clients.
+
+ MaxRedirects int
+ ReadOnly bool
+ RouteByLatency bool
+ RouteRandomly bool
+
+ // The sentinel master name.
+ // Only failover clients.
+ MasterName string
}
func (o *UniversalOptions) cluster() *ClusterOptions {
@@ -49,22 +52,31 @@ func (o *UniversalOptions) cluster() *ClusterOptions {
}
return &ClusterOptions{
- Addrs: o.Addrs,
+ Addrs: o.Addrs,
+ OnConnect: o.OnConnect,
+
+ Password: o.Password,
+
MaxRedirects: o.MaxRedirects,
- RouteByLatency: o.RouteByLatency,
ReadOnly: o.ReadOnly,
+ RouteByLatency: o.RouteByLatency,
+ RouteRandomly: o.RouteRandomly,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}
@@ -76,19 +88,27 @@ func (o *UniversalOptions) failover() *FailoverOptions {
return &FailoverOptions{
SentinelAddrs: o.Addrs,
MasterName: o.MasterName,
- DB: o.DB,
+ OnConnect: o.OnConnect,
+
+ DB: o.DB,
+ Password: o.Password,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
+
+ DialTimeout: o.DialTimeout,
+ ReadTimeout: o.ReadTimeout,
+ WriteTimeout: o.WriteTimeout,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
- DialTimeout: o.DialTimeout,
- ReadTimeout: o.ReadTimeout,
- WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}
@@ -99,20 +119,28 @@ func (o *UniversalOptions) simple() *Options {
}
return &Options{
- Addr: addr,
- DB: o.DB,
+ Addr: addr,
+ OnConnect: o.OnConnect,
+
+ DB: o.DB,
+ Password: o.Password,
+
+ MaxRetries: o.MaxRetries,
+ MinRetryBackoff: o.MinRetryBackoff,
+ MaxRetryBackoff: o.MaxRetryBackoff,
+
+ DialTimeout: o.DialTimeout,
+ ReadTimeout: o.ReadTimeout,
+ WriteTimeout: o.WriteTimeout,
- OnConnect: o.OnConnect,
- MaxRetries: o.MaxRetries,
- Password: o.Password,
- DialTimeout: o.DialTimeout,
- ReadTimeout: o.ReadTimeout,
- WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
+ MinIdleConns: o.MinIdleConns,
+ MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
- TLSConfig: o.TLSConfig,
+
+ TLSConfig: o.TLSConfig,
}
}
diff --git a/src/dma/vendor/github.com/labstack/echo/.travis.yml b/src/dma/vendor/github.com/labstack/echo/.travis.yml
index 05e53b16..30346d7f 100644
--- a/src/dma/vendor/github.com/labstack/echo/.travis.yml
+++ b/src/dma/vendor/github.com/labstack/echo/.travis.yml
@@ -1,12 +1,14 @@
language: go
go:
- - 1.9.x
- - 1.10.x
+ - 1.11.x
- tip
+env:
+ - GO111MODULE=on
install:
- - make dependency
+ - go get -v golang.org/x/lint/golint
script:
- - make test
+ - golint -set_exit_status ./...
+ - go test -race -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
matrix:
diff --git a/src/dma/vendor/github.com/labstack/echo/Gopkg.lock b/src/dma/vendor/github.com/labstack/echo/Gopkg.lock
deleted file mode 100644
index f3c3b8d2..00000000
--- a/src/dma/vendor/github.com/labstack/echo/Gopkg.lock
+++ /dev/null
@@ -1,75 +0,0 @@
-# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
-
-
-[[projects]]
- name = "github.com/davecgh/go-spew"
- packages = ["spew"]
- revision = "346938d642f2ec3594ed81d874461961cd0faa76"
- version = "v1.1.0"
-
-[[projects]]
- name = "github.com/dgrijalva/jwt-go"
- packages = ["."]
- revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e"
- version = "v3.2.0"
-
-[[projects]]
- name = "github.com/labstack/gommon"
- packages = ["bytes","color","log","random"]
- revision = "6fe1405d73ec4bd4cd8a4ac8e2a2b2bf95d03954"
- version = "0.2.4"
-
-[[projects]]
- name = "github.com/mattn/go-colorable"
- packages = ["."]
- revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072"
- version = "v0.0.9"
-
-[[projects]]
- name = "github.com/mattn/go-isatty"
- packages = ["."]
- revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39"
- version = "v0.0.3"
-
-[[projects]]
- name = "github.com/pmezard/go-difflib"
- packages = ["difflib"]
- revision = "792786c7400a136282c1664665ae0a8db921c6c2"
- version = "v1.0.0"
-
-[[projects]]
- name = "github.com/stretchr/testify"
- packages = ["assert"]
- revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71"
- version = "v1.2.1"
-
-[[projects]]
- branch = "master"
- name = "github.com/valyala/bytebufferpool"
- packages = ["."]
- revision = "e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7"
-
-[[projects]]
- branch = "master"
- name = "github.com/valyala/fasttemplate"
- packages = ["."]
- revision = "dcecefd839c4193db0d35b88ec65b4c12d360ab0"
-
-[[projects]]
- branch = "master"
- name = "golang.org/x/crypto"
- packages = ["acme","acme/autocert"]
- revision = "182114d582623c1caa54f73de9c7224e23a48487"
-
-[[projects]]
- branch = "master"
- name = "golang.org/x/sys"
- packages = ["unix"]
- revision = "c28acc882ebcbfbe8ce9f0f14b9ac26ee138dd51"
-
-[solve-meta]
- analyzer-name = "dep"
- analyzer-version = 1
- inputs-digest = "9c7b45e80fe353405800cf01f429b3a203cfb8d4468a04c64a908e11a98ea764"
- solver-name = "gps-cdcl"
- solver-version = 1
diff --git a/src/dma/vendor/github.com/labstack/echo/Gopkg.toml b/src/dma/vendor/github.com/labstack/echo/Gopkg.toml
deleted file mode 100644
index 61de60cb..00000000
--- a/src/dma/vendor/github.com/labstack/echo/Gopkg.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-
-# Gopkg.toml example
-#
-# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
-# for detailed Gopkg.toml documentation.
-#
-# required = ["github.com/user/thing/cmd/thing"]
-# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
-#
-# [[constraint]]
-# name = "github.com/user/project"
-# version = "1.0.0"
-#
-# [[constraint]]
-# name = "github.com/user/project2"
-# branch = "dev"
-# source = "github.com/myfork/project2"
-#
-# [[override]]
-# name = "github.com/x/y"
-# version = "2.4.0"
-
-
-[[constraint]]
- name = "github.com/dgrijalva/jwt-go"
- version = "3.2.0"
-
-[[constraint]]
- name = "github.com/labstack/gommon"
- version = "0.2.4"
-
-[[constraint]]
- name = "github.com/stretchr/testify"
- version = "1.2.1"
-
-[[constraint]]
- branch = "master"
- name = "github.com/valyala/fasttemplate"
-
-[[constraint]]
- branch = "master"
- name = "golang.org/x/crypto" \ No newline at end of file
diff --git a/src/dma/vendor/github.com/labstack/echo/Makefile b/src/dma/vendor/github.com/labstack/echo/Makefile
index 494667d8..dfcb6c02 100644
--- a/src/dma/vendor/github.com/labstack/echo/Makefile
+++ b/src/dma/vendor/github.com/labstack/echo/Makefile
@@ -1,17 +1,3 @@
-DEP_VERSION=0.4.1
-
-dependency:
- curl -fsSL -o ${GOPATH}/bin/dep https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64
- chmod +x ${GOPATH}/bin/dep
- dep ensure
-
-test:
- echo "" > coverage.txt
- for d in $(shell go list ./... | grep -v vendor); do \
- go test -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
- [ -f profile.out ] && cat profile.out >> coverage.txt && rm profile.out; \
- done
-
tag:
@git tag `grep -P '^\tversion = ' echo.go|cut -f2 -d'"'`
@git tag|grep -v ^v
diff --git a/src/dma/vendor/github.com/labstack/echo/README.md b/src/dma/vendor/github.com/labstack/echo/README.md
index 0f609df4..49e4d3b1 100644
--- a/src/dma/vendor/github.com/labstack/echo/README.md
+++ b/src/dma/vendor/github.com/labstack/echo/README.md
@@ -32,7 +32,7 @@ Date: 2018/03/15<br>
Source: https://github.com/vishr/web-framework-benchmark<br>
Lower is better!
-<img src="https://api.labstack.com/chart/bar?values=37223,55382,2985,5265|42013,59865,3350,6424&labels=Static,GitHub%20API,Parse%20API,Gplus%20API&titles=Echo,Gin&colors=lightseagreen,goldenrod&x_title=Routes&y_title=ns/op">
+<img src="https://i.imgur.com/I32VdMJ.png">
## [Guide](https://echo.labstack.com/guide)
diff --git a/src/dma/vendor/github.com/labstack/echo/bind.go b/src/dma/vendor/github.com/labstack/echo/bind.go
index 38e07150..4998e25b 100644
--- a/src/dma/vendor/github.com/labstack/echo/bind.go
+++ b/src/dma/vendor/github.com/labstack/echo/bind.go
@@ -31,9 +31,9 @@ type (
func (b *DefaultBinder) Bind(i interface{}, c Context) (err error) {
req := c.Request()
if req.ContentLength == 0 {
- if req.Method == GET || req.Method == DELETE {
+ if req.Method == http.MethodGet || req.Method == http.MethodDelete {
if err = b.bindData(i, c.QueryParams(), "query"); err != nil {
- return NewHTTPError(http.StatusBadRequest, err.Error())
+ return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err)
}
return
}
@@ -44,30 +44,32 @@ func (b *DefaultBinder) Bind(i interface{}, c Context) (err error) {
case strings.HasPrefix(ctype, MIMEApplicationJSON):
if err = json.NewDecoder(req.Body).Decode(i); err != nil {
if ute, ok := err.(*json.UnmarshalTypeError); ok {
- return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unmarshal type error: expected=%v, got=%v, offset=%v", ute.Type, ute.Value, ute.Offset))
+ return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unmarshal type error: expected=%v, got=%v, field=%v, offset=%v", ute.Type, ute.Value, ute.Field, ute.Offset)).SetInternal(err)
} else if se, ok := err.(*json.SyntaxError); ok {
- return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: offset=%v, error=%v", se.Offset, se.Error()))
+ return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: offset=%v, error=%v", se.Offset, se.Error())).SetInternal(err)
} else {
- return NewHTTPError(http.StatusBadRequest, err.Error())
+ return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err)
}
+ return NewHTTPError(http.StatusBadRequest, err.Error())
}
case strings.HasPrefix(ctype, MIMEApplicationXML), strings.HasPrefix(ctype, MIMETextXML):
if err = xml.NewDecoder(req.Body).Decode(i); err != nil {
if ute, ok := err.(*xml.UnsupportedTypeError); ok {
- return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unsupported type error: type=%v, error=%v", ute.Type, ute.Error()))
+ return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unsupported type error: type=%v, error=%v", ute.Type, ute.Error())).SetInternal(err)
} else if se, ok := err.(*xml.SyntaxError); ok {
- return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: line=%v, error=%v", se.Line, se.Error()))
+ return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: line=%v, error=%v", se.Line, se.Error())).SetInternal(err)
} else {
- return NewHTTPError(http.StatusBadRequest, err.Error())
+ return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err)
}
+ return NewHTTPError(http.StatusBadRequest, err.Error())
}
case strings.HasPrefix(ctype, MIMEApplicationForm), strings.HasPrefix(ctype, MIMEMultipartForm):
params, err := c.FormParams()
if err != nil {
- return NewHTTPError(http.StatusBadRequest, err.Error())
+ return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err)
}
if err = b.bindData(i, params, "form"); err != nil {
- return NewHTTPError(http.StatusBadRequest, err.Error())
+ return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err)
}
default:
return ErrUnsupportedMediaType
@@ -96,15 +98,30 @@ func (b *DefaultBinder) bindData(ptr interface{}, data map[string][]string, tag
inputFieldName = typeField.Name
// If tag is nil, we inspect if the field is a struct.
if _, ok := bindUnmarshaler(structField); !ok && structFieldKind == reflect.Struct {
- err := b.bindData(structField.Addr().Interface(), data, tag)
- if err != nil {
+ if err := b.bindData(structField.Addr().Interface(), data, tag); err != nil {
return err
}
continue
}
}
+
inputValue, exists := data[inputFieldName]
if !exists {
+ // Go json.Unmarshal supports case insensitive binding. However the
+ // url params are bound case sensitive which is inconsistent. To
+ // fix this we must check all of the map values in a
+ // case-insensitive search.
+ inputFieldName = strings.ToLower(inputFieldName)
+ for k, v := range data {
+ if strings.ToLower(k) == inputFieldName {
+ inputValue = v
+ exists = true
+ break
+ }
+ }
+ }
+
+ if !exists {
continue
}
@@ -126,10 +143,9 @@ func (b *DefaultBinder) bindData(ptr interface{}, data map[string][]string, tag
}
}
val.Field(i).Set(slice)
- } else {
- if err := setWithProperType(typeField.Type.Kind(), inputValue[0], structField); err != nil {
- return err
- }
+ } else if err := setWithProperType(typeField.Type.Kind(), inputValue[0], structField); err != nil {
+ return err
+
}
}
return nil
diff --git a/src/dma/vendor/github.com/labstack/echo/context.go b/src/dma/vendor/github.com/labstack/echo/context.go
index cf780c51..d4722700 100644
--- a/src/dma/vendor/github.com/labstack/echo/context.go
+++ b/src/dma/vendor/github.com/labstack/echo/context.go
@@ -204,6 +204,7 @@ type (
const (
defaultMemory = 32 << 20 // 32 MB
indexPage = "index.html"
+ defaultIndent = " "
)
func (c *context) writeContentType(value string) {
@@ -256,14 +257,13 @@ func (c *context) Scheme() string {
}
func (c *context) RealIP() string {
- ra := c.request.RemoteAddr
if ip := c.request.Header.Get(HeaderXForwardedFor); ip != "" {
- ra = strings.Split(ip, ", ")[0]
- } else if ip := c.request.Header.Get(HeaderXRealIP); ip != "" {
- ra = ip
- } else {
- ra, _, _ = net.SplitHostPort(ra)
+ return strings.Split(ip, ", ")[0]
+ }
+ if ip := c.request.Header.Get(HeaderXRealIP); ip != "" {
+ return ip
}
+ ra, _, _ := net.SplitHostPort(c.request.RemoteAddr)
return ra
}
@@ -404,24 +404,46 @@ func (c *context) String(code int, s string) (err error) {
return c.Blob(code, MIMETextPlainCharsetUTF8, []byte(s))
}
-func (c *context) JSON(code int, i interface{}) (err error) {
+func (c *context) jsonPBlob(code int, callback string, i interface{}) (err error) {
+ enc := json.NewEncoder(c.response)
_, pretty := c.QueryParams()["pretty"]
if c.echo.Debug || pretty {
- return c.JSONPretty(code, i, " ")
+ enc.SetIndent("", " ")
}
- b, err := json.Marshal(i)
- if err != nil {
+ c.writeContentType(MIMEApplicationJavaScriptCharsetUTF8)
+ c.response.WriteHeader(code)
+ if _, err = c.response.Write([]byte(callback + "(")); err != nil {
+ return
+ }
+ if err = enc.Encode(i); err != nil {
+ return
+ }
+ if _, err = c.response.Write([]byte(");")); err != nil {
return
}
- return c.JSONBlob(code, b)
+ return
}
-func (c *context) JSONPretty(code int, i interface{}, indent string) (err error) {
- b, err := json.MarshalIndent(i, "", indent)
- if err != nil {
- return
+func (c *context) json(code int, i interface{}, indent string) error {
+ enc := json.NewEncoder(c.response)
+ if indent != "" {
+ enc.SetIndent("", indent)
+ }
+ c.writeContentType(MIMEApplicationJSONCharsetUTF8)
+ c.response.WriteHeader(code)
+ return enc.Encode(i)
+}
+
+func (c *context) JSON(code int, i interface{}) (err error) {
+ indent := ""
+ if _, pretty := c.QueryParams()["pretty"]; c.echo.Debug || pretty {
+ indent = defaultIndent
}
- return c.JSONBlob(code, b)
+ return c.json(code, i, indent)
+}
+
+func (c *context) JSONPretty(code int, i interface{}, indent string) (err error) {
+ return c.json(code, i, indent)
}
func (c *context) JSONBlob(code int, b []byte) (err error) {
@@ -429,11 +451,7 @@ func (c *context) JSONBlob(code int, b []byte) (err error) {
}
func (c *context) JSONP(code int, callback string, i interface{}) (err error) {
- b, err := json.Marshal(i)
- if err != nil {
- return
- }
- return c.JSONPBlob(code, callback, b)
+ return c.jsonPBlob(code, callback, i)
}
func (c *context) JSONPBlob(code int, callback string, b []byte) (err error) {
@@ -449,24 +467,29 @@ func (c *context) JSONPBlob(code int, callback string, b []byte) (err error) {
return
}
-func (c *context) XML(code int, i interface{}) (err error) {
- _, pretty := c.QueryParams()["pretty"]
- if c.echo.Debug || pretty {
- return c.XMLPretty(code, i, " ")
+func (c *context) xml(code int, i interface{}, indent string) (err error) {
+ c.writeContentType(MIMEApplicationXMLCharsetUTF8)
+ c.response.WriteHeader(code)
+ enc := xml.NewEncoder(c.response)
+ if indent != "" {
+ enc.Indent("", indent)
}
- b, err := xml.Marshal(i)
- if err != nil {
+ if _, err = c.response.Write([]byte(xml.Header)); err != nil {
return
}
- return c.XMLBlob(code, b)
+ return enc.Encode(i)
}
-func (c *context) XMLPretty(code int, i interface{}, indent string) (err error) {
- b, err := xml.MarshalIndent(i, "", indent)
- if err != nil {
- return
+func (c *context) XML(code int, i interface{}) (err error) {
+ indent := ""
+ if _, pretty := c.QueryParams()["pretty"]; c.echo.Debug || pretty {
+ indent = defaultIndent
}
- return c.XMLBlob(code, b)
+ return c.xml(code, i, indent)
+}
+
+func (c *context) XMLPretty(code int, i interface{}, indent string) (err error) {
+ return c.xml(code, i, indent)
}
func (c *context) XMLBlob(code int, b []byte) (err error) {
@@ -574,3 +597,4 @@ func (c *context) Reset(r *http.Request, w http.ResponseWriter) {
// NOTE: Don't reset because it has to have length c.echo.maxParam at all times
// c.pvalues = nil
}
+
diff --git a/src/dma/vendor/github.com/labstack/echo/echo.go b/src/dma/vendor/github.com/labstack/echo/echo.go
index 41ac6b5e..98286515 100644
--- a/src/dma/vendor/github.com/labstack/echo/echo.go
+++ b/src/dma/vendor/github.com/labstack/echo/echo.go
@@ -62,7 +62,7 @@ import (
type (
// Echo is the top-level framework instance.
Echo struct {
- stdLogger *stdLog.Logger
+ StdLogger *stdLog.Logger
colorer *color.Color
premiddleware []MiddlewareFunc
middleware []MiddlewareFunc
@@ -103,7 +103,7 @@ type (
// MiddlewareFunc defines a function to process middleware.
MiddlewareFunc func(HandlerFunc) HandlerFunc
- // HandlerFunc defines a function to server HTTP requests.
+ // HandlerFunc defines a function to serve HTTP requests.
HandlerFunc func(Context) error
// HTTPErrorHandler is a centralized HTTP error handler.
@@ -129,17 +129,18 @@ type (
)
// HTTP methods
+// NOTE: Deprecated, please use the stdlib constants directly instead.
const (
- CONNECT = "CONNECT"
- DELETE = "DELETE"
- GET = "GET"
- HEAD = "HEAD"
- OPTIONS = "OPTIONS"
- PATCH = "PATCH"
- POST = "POST"
- PROPFIND = "PROPFIND"
- PUT = "PUT"
- TRACE = "TRACE"
+ CONNECT = http.MethodConnect
+ DELETE = http.MethodDelete
+ GET = http.MethodGet
+ HEAD = http.MethodHead
+ OPTIONS = http.MethodOptions
+ PATCH = http.MethodPatch
+ POST = http.MethodPost
+ // PROPFIND = "PROPFIND"
+ PUT = http.MethodPut
+ TRACE = http.MethodTrace
)
// MIME types
@@ -165,6 +166,8 @@ const (
const (
charsetUTF8 = "charset=UTF-8"
+ // PROPFIND Method can be used on collection and property resources.
+ PROPFIND = "PROPFIND"
)
// Headers
@@ -217,7 +220,8 @@ const (
)
const (
- Version = "3.3.5"
+ // Version of Echo
+ Version = "3.3.10-dev"
website = "https://echo.labstack.com"
// http://patorjk.com/software/taag/#p=display&f=Small%20Slant&t=Echo
banner = `
@@ -234,16 +238,16 @@ ____________________________________O/_______
var (
methods = [...]string{
- CONNECT,
- DELETE,
- GET,
- HEAD,
- OPTIONS,
- PATCH,
- POST,
+ http.MethodConnect,
+ http.MethodDelete,
+ http.MethodGet,
+ http.MethodHead,
+ http.MethodOptions,
+ http.MethodPatch,
+ http.MethodPost,
PROPFIND,
- PUT,
- TRACE,
+ http.MethodPut,
+ http.MethodTrace,
}
)
@@ -255,6 +259,12 @@ var (
ErrForbidden = NewHTTPError(http.StatusForbidden)
ErrMethodNotAllowed = NewHTTPError(http.StatusMethodNotAllowed)
ErrStatusRequestEntityTooLarge = NewHTTPError(http.StatusRequestEntityTooLarge)
+ ErrTooManyRequests = NewHTTPError(http.StatusTooManyRequests)
+ ErrBadRequest = NewHTTPError(http.StatusBadRequest)
+ ErrBadGateway = NewHTTPError(http.StatusBadGateway)
+ ErrInternalServerError = NewHTTPError(http.StatusInternalServerError)
+ ErrRequestTimeout = NewHTTPError(http.StatusRequestTimeout)
+ ErrServiceUnavailable = NewHTTPError(http.StatusServiceUnavailable)
ErrValidatorNotRegistered = errors.New("validator not registered")
ErrRendererNotRegistered = errors.New("renderer not registered")
ErrInvalidRedirectCode = errors.New("invalid redirect status code")
@@ -289,7 +299,7 @@ func New() (e *Echo) {
e.HTTPErrorHandler = e.DefaultHTTPErrorHandler
e.Binder = &DefaultBinder{}
e.Logger.SetLevel(log.ERROR)
- e.stdLogger = stdLog.New(e.Logger.Output(), e.Logger.Prefix()+": ", 0)
+ e.StdLogger = stdLog.New(e.Logger.Output(), e.Logger.Prefix()+": ", 0)
e.pool.New = func() interface{} {
return e.NewContext(nil, nil)
}
@@ -326,7 +336,7 @@ func (e *Echo) DefaultHTTPErrorHandler(err error, c Context) {
code = he.Code
msg = he.Message
if he.Internal != nil {
- msg = fmt.Sprintf("%v, %v", err, he.Internal)
+ err = fmt.Errorf("%v, %v", err, he.Internal)
}
} else if e.Debug {
msg = err.Error()
@@ -337,11 +347,9 @@ func (e *Echo) DefaultHTTPErrorHandler(err error, c Context) {
msg = Map{"message": msg}
}
- e.Logger.Error(err)
-
// Send response
if !c.Response().Committed {
- if c.Request().Method == HEAD { // Issue #608
+ if c.Request().Method == http.MethodHead { // Issue #608
err = c.NoContent(code)
} else {
err = c.JSON(code, msg)
@@ -365,55 +373,55 @@ func (e *Echo) Use(middleware ...MiddlewareFunc) {
// CONNECT registers a new CONNECT route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) CONNECT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(CONNECT, path, h, m...)
+ return e.Add(http.MethodConnect, path, h, m...)
}
// DELETE registers a new DELETE route for a path with matching handler in the router
// with optional route-level middleware.
func (e *Echo) DELETE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(DELETE, path, h, m...)
+ return e.Add(http.MethodDelete, path, h, m...)
}
// GET registers a new GET route for a path with matching handler in the router
// with optional route-level middleware.
func (e *Echo) GET(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(GET, path, h, m...)
+ return e.Add(http.MethodGet, path, h, m...)
}
// HEAD registers a new HEAD route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) HEAD(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(HEAD, path, h, m...)
+ return e.Add(http.MethodHead, path, h, m...)
}
// OPTIONS registers a new OPTIONS route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) OPTIONS(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(OPTIONS, path, h, m...)
+ return e.Add(http.MethodOptions, path, h, m...)
}
// PATCH registers a new PATCH route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) PATCH(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(PATCH, path, h, m...)
+ return e.Add(http.MethodPatch, path, h, m...)
}
// POST registers a new POST route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) POST(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(POST, path, h, m...)
+ return e.Add(http.MethodPost, path, h, m...)
}
// PUT registers a new PUT route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) PUT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(PUT, path, h, m...)
+ return e.Add(http.MethodPut, path, h, m...)
}
// TRACE registers a new TRACE route for a path with matching handler in the
// router with optional route-level middleware.
func (e *Echo) TRACE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return e.Add(TRACE, path, h, m...)
+ return e.Add(http.MethodTrace, path, h, m...)
}
// Any registers a new route for all HTTP methods and path with matching handler
@@ -462,11 +470,11 @@ func static(i i, prefix, root string) *Route {
return i.GET(prefix+"/*", h)
}
-// File registers a new route with path to serve a static file.
-func (e *Echo) File(path, file string) *Route {
+// File registers a new route with path to serve a static file with optional route-level middleware.
+func (e *Echo) File(path, file string, m ...MiddlewareFunc) *Route {
return e.GET(path, func(c Context) error {
return c.File(file)
- })
+ }, m...)
}
// Add registers a new route for an HTTP method and path with matching handler
@@ -559,26 +567,17 @@ func (e *Echo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := e.pool.Get().(*context)
c.Reset(r, w)
- m := r.Method
h := NotFoundHandler
if e.premiddleware == nil {
- path := r.URL.RawPath
- if path == "" {
- path = r.URL.Path
- }
- e.router.Find(m, getPath(r), c)
+ e.router.Find(r.Method, getPath(r), c)
h = c.Handler()
for i := len(e.middleware) - 1; i >= 0; i-- {
h = e.middleware[i](h)
}
} else {
h = func(c Context) error {
- path := r.URL.RawPath
- if path == "" {
- path = r.URL.Path
- }
- e.router.Find(m, getPath(r), c)
+ e.router.Find(r.Method, getPath(r), c)
h := c.Handler()
for i := len(e.middleware) - 1; i >= 0; i-- {
h = e.middleware[i](h)
@@ -622,10 +621,6 @@ func (e *Echo) StartTLS(address string, certFile, keyFile string) (err error) {
// StartAutoTLS starts an HTTPS server using certificates automatically installed from https://letsencrypt.org.
func (e *Echo) StartAutoTLS(address string) error {
- if e.Listener == nil {
- go http.ListenAndServe(":http", e.AutoTLSManager.HTTPHandler(nil))
- }
-
s := e.TLSServer
s.TLSConfig = new(tls.Config)
s.TLSConfig.GetCertificate = e.AutoTLSManager.GetCertificate
@@ -645,7 +640,7 @@ func (e *Echo) startTLS(address string) error {
func (e *Echo) StartServer(s *http.Server) (err error) {
// Setup
e.colorer.SetOutput(e.Logger.Output())
- s.ErrorLog = e.stdLogger
+ s.ErrorLog = e.StdLogger
s.Handler = e
if e.Debug {
e.Logger.SetLevel(log.DEBUG)
@@ -689,7 +684,7 @@ func (e *Echo) Close() error {
return e.Server.Close()
}
-// Shutdown stops server the gracefully.
+// Shutdown stops the server gracefully.
// It internally calls `http.Server#Shutdown()`.
func (e *Echo) Shutdown(ctx stdContext.Context) error {
if err := e.TLSServer.Shutdown(ctx); err != nil {
@@ -712,6 +707,12 @@ func (he *HTTPError) Error() string {
return fmt.Sprintf("code=%d, message=%v", he.Code, he.Message)
}
+// SetInternal sets error to HTTPError.Internal
+func (he *HTTPError) SetInternal(err error) *HTTPError {
+ he.Internal = err
+ return he
+}
+
// WrapHandler wraps `http.Handler` into `echo.HandlerFunc`.
func WrapHandler(h http.Handler) HandlerFunc {
return func(c Context) error {
diff --git a/src/dma/vendor/github.com/labstack/echo/group.go b/src/dma/vendor/github.com/labstack/echo/group.go
index 5257e83c..3e3732b6 100644
--- a/src/dma/vendor/github.com/labstack/echo/group.go
+++ b/src/dma/vendor/github.com/labstack/echo/group.go
@@ -1,6 +1,7 @@
package echo
import (
+ "net/http"
"path"
)
@@ -29,47 +30,47 @@ func (g *Group) Use(middleware ...MiddlewareFunc) {
// CONNECT implements `Echo#CONNECT()` for sub-routes within the Group.
func (g *Group) CONNECT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(CONNECT, path, h, m...)
+ return g.Add(http.MethodConnect, path, h, m...)
}
// DELETE implements `Echo#DELETE()` for sub-routes within the Group.
func (g *Group) DELETE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(DELETE, path, h, m...)
+ return g.Add(http.MethodDelete, path, h, m...)
}
// GET implements `Echo#GET()` for sub-routes within the Group.
func (g *Group) GET(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(GET, path, h, m...)
+ return g.Add(http.MethodGet, path, h, m...)
}
// HEAD implements `Echo#HEAD()` for sub-routes within the Group.
func (g *Group) HEAD(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(HEAD, path, h, m...)
+ return g.Add(http.MethodHead, path, h, m...)
}
// OPTIONS implements `Echo#OPTIONS()` for sub-routes within the Group.
func (g *Group) OPTIONS(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(OPTIONS, path, h, m...)
+ return g.Add(http.MethodOptions, path, h, m...)
}
// PATCH implements `Echo#PATCH()` for sub-routes within the Group.
func (g *Group) PATCH(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(PATCH, path, h, m...)
+ return g.Add(http.MethodPatch, path, h, m...)
}
// POST implements `Echo#POST()` for sub-routes within the Group.
func (g *Group) POST(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(POST, path, h, m...)
+ return g.Add(http.MethodPost, path, h, m...)
}
// PUT implements `Echo#PUT()` for sub-routes within the Group.
func (g *Group) PUT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(PUT, path, h, m...)
+ return g.Add(http.MethodPut, path, h, m...)
}
// TRACE implements `Echo#TRACE()` for sub-routes within the Group.
func (g *Group) TRACE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route {
- return g.Add(TRACE, path, h, m...)
+ return g.Add(http.MethodTrace, path, h, m...)
}
// Any implements `Echo#Any()` for sub-routes within the Group.
diff --git a/src/dma/vendor/github.com/labstack/echo/log.go b/src/dma/vendor/github.com/labstack/echo/log.go
index b194c39c..3f8de590 100644
--- a/src/dma/vendor/github.com/labstack/echo/log.go
+++ b/src/dma/vendor/github.com/labstack/echo/log.go
@@ -15,6 +15,7 @@ type (
SetPrefix(p string)
Level() log.Lvl
SetLevel(v log.Lvl)
+ SetHeader(h string)
Print(i ...interface{})
Printf(format string, args ...interface{})
Printj(j log.JSON)
diff --git a/src/dma/vendor/github.com/labstack/echo/router.go b/src/dma/vendor/github.com/labstack/echo/router.go
index ff53da87..73f0b68b 100644
--- a/src/dma/vendor/github.com/labstack/echo/router.go
+++ b/src/dma/vendor/github.com/labstack/echo/router.go
@@ -1,5 +1,7 @@
package echo
+import "net/http"
+
type (
// Router is the registry of all registered routes for an `Echo` instance for
// request matching and URL path parameter parsing.
@@ -79,7 +81,7 @@ func (r *Router) Add(method, path string, h HandlerFunc) {
r.insert(method, path[:i], h, pkind, ppath, pnames)
return
}
- r.insert(method, path[:i], nil, pkind, ppath, pnames)
+ r.insert(method, path[:i], nil, pkind, "", nil)
} else if path[i] == '*' {
r.insert(method, path[:i], nil, skind, "", nil)
pnames = append(pnames, "*")
@@ -226,50 +228,50 @@ func (n *node) findChildByKind(t kind) *node {
func (n *node) addHandler(method string, h HandlerFunc) {
switch method {
- case CONNECT:
+ case http.MethodConnect:
n.methodHandler.connect = h
- case DELETE:
+ case http.MethodDelete:
n.methodHandler.delete = h
- case GET:
+ case http.MethodGet:
n.methodHandler.get = h
- case HEAD:
+ case http.MethodHead:
n.methodHandler.head = h
- case OPTIONS:
+ case http.MethodOptions:
n.methodHandler.options = h
- case PATCH:
+ case http.MethodPatch:
n.methodHandler.patch = h
- case POST:
+ case http.MethodPost:
n.methodHandler.post = h
case PROPFIND:
n.methodHandler.propfind = h
- case PUT:
+ case http.MethodPut:
n.methodHandler.put = h
- case TRACE:
+ case http.MethodTrace:
n.methodHandler.trace = h
}
}
func (n *node) findHandler(method string) HandlerFunc {
switch method {
- case CONNECT:
+ case http.MethodConnect:
return n.methodHandler.connect
- case DELETE:
+ case http.MethodDelete:
return n.methodHandler.delete
- case GET:
+ case http.MethodGet:
return n.methodHandler.get
- case HEAD:
+ case http.MethodHead:
return n.methodHandler.head
- case OPTIONS:
+ case http.MethodOptions:
return n.methodHandler.options
- case PATCH:
+ case http.MethodPatch:
return n.methodHandler.patch
- case POST:
+ case http.MethodPost:
return n.methodHandler.post
case PROPFIND:
return n.methodHandler.propfind
- case PUT:
+ case http.MethodPut:
return n.methodHandler.put
- case TRACE:
+ case http.MethodTrace:
return n.methodHandler.trace
default:
return nil
@@ -311,7 +313,7 @@ func (r *Router) Find(method, path string, c Context) {
// Search order static > param > any
for {
if search == "" {
- goto End
+ break
}
pl := 0 // Prefix length
@@ -346,7 +348,7 @@ func (r *Router) Find(method, path string, c Context) {
}
if search == "" {
- goto End
+ break
}
// Static node
@@ -403,10 +405,9 @@ func (r *Router) Find(method, path string, c Context) {
return
}
pvalues[len(cn.pnames)-1] = search
- goto End
+ break
}
-End:
ctx.handler = cn.findHandler(method)
ctx.path = cn.ppath
ctx.pnames = cn.pnames
diff --git a/src/dma/vendor/github.com/labstack/gommon/LICENSE b/src/dma/vendor/github.com/labstack/gommon/LICENSE
index d2ae3edf..fc718faf 100644
--- a/src/dma/vendor/github.com/labstack/gommon/LICENSE
+++ b/src/dma/vendor/github.com/labstack/gommon/LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2015 labstack
+Copyright (c) 2018 labstack
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
diff --git a/src/dma/vendor/github.com/labstack/gommon/log/log.go b/src/dma/vendor/github.com/labstack/gommon/log/log.go
index 0d77a87a..132411db 100644
--- a/src/dma/vendor/github.com/labstack/gommon/log/log.go
+++ b/src/dma/vendor/github.com/labstack/gommon/log/log.go
@@ -219,7 +219,7 @@ func (l *Logger) Panic(i ...interface{}) {
func (l *Logger) Panicf(format string, args ...interface{}) {
l.log(panicLevel, format, args...)
- panic(fmt.Sprintf(format, args))
+ panic(fmt.Sprintf(format, args...))
}
func (l *Logger) Panicj(j JSON) {
diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go b/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go
index 3a3ef5b7..8c3960cc 100644
--- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go
+++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go
@@ -593,6 +593,7 @@ const (
DOMAIN_SHUTOFF_SAVED = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_SAVED)
DOMAIN_SHUTOFF_FAILED = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_FAILED)
DOMAIN_SHUTOFF_FROM_SNAPSHOT = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_FROM_SNAPSHOT)
+ DOMAIN_SHUTOFF_DAEMON = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_DAEMON)
)
type DomainBlockCommitFlags int
@@ -768,6 +769,7 @@ const (
DOMAIN_STATS_INTERFACE = DomainStatsTypes(C.VIR_DOMAIN_STATS_INTERFACE)
DOMAIN_STATS_BLOCK = DomainStatsTypes(C.VIR_DOMAIN_STATS_BLOCK)
DOMAIN_STATS_PERF = DomainStatsTypes(C.VIR_DOMAIN_STATS_PERF)
+ DOMAIN_STATS_IOTHREAD = DomainStatsTypes(C.VIR_DOMAIN_STATS_IOTHREAD)
)
type DomainCoreDumpFlags int
@@ -4206,6 +4208,57 @@ func (d *Domain) DelIOThread(id uint, flags DomainModificationImpact) error {
return nil
}
+// See also https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainSetIOThreadParams
+
+type DomainSetIOThreadParams struct {
+ PollMaxNsSet bool
+ PollMaxNs uint64
+ PollGrowSet bool
+ PollGrow uint
+ PollShrinkSet bool
+ PollShrink uint
+}
+
+func getSetIOThreadParamsFieldInfo(params *DomainSetIOThreadParams) map[string]typedParamsFieldInfo {
+ return map[string]typedParamsFieldInfo{
+ C.VIR_DOMAIN_IOTHREAD_POLL_MAX_NS: typedParamsFieldInfo{
+ set: &params.PollMaxNsSet,
+ ul: &params.PollMaxNs,
+ },
+ C.VIR_DOMAIN_IOTHREAD_POLL_GROW: typedParamsFieldInfo{
+ set: &params.PollGrowSet,
+ ui: &params.PollGrow,
+ },
+ C.VIR_DOMAIN_IOTHREAD_POLL_SHRINK: typedParamsFieldInfo{
+ set: &params.PollShrinkSet,
+ ui: &params.PollShrink,
+ },
+ }
+}
+
+func (d *Domain) SetIOThreadParams(iothreadid uint, params *DomainSetIOThreadParams, flags DomainModificationImpact) error {
+ if C.LIBVIR_VERSION_NUMBER < 4010000 {
+ return makeNotImplementedError("virDomainSetIOThreadParams")
+ }
+ info := getSetIOThreadParamsFieldInfo(params)
+
+ cparams, gerr := typedParamsPackNew(info)
+ if gerr != nil {
+ return gerr
+ }
+ nparams := len(*cparams)
+
+ defer C.virTypedParamsClear((*C.virTypedParameter)(unsafe.Pointer(&(*cparams)[0])), C.int(nparams))
+
+ var err C.virError
+ ret := C.virDomainSetIOThreadParamsWrapper(d.ptr, C.uint(iothreadid), (*C.virTypedParameter)(unsafe.Pointer(&(*cparams)[0])), C.int(nparams), C.uint(flags), &err)
+ if ret == -1 {
+ return makeError(&err)
+ }
+
+ return nil
+}
+
// See also https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainGetEmulatorPinInfo
func (d *Domain) GetEmulatorPinInfo(flags DomainModificationImpact) ([]bool, error) {
var cnodeinfo C.virNodeInfo
diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h
index d20631fc..19a3e24e 100644
--- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h
+++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h
@@ -895,7 +895,7 @@ struct _virDomainInterface {
#endif
#ifndef VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP
-#define VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP 1
+#define VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP 2
#endif
/* 4.5.0 */
@@ -911,4 +911,26 @@ struct _virDomainInterface {
#define VIR_DOMAIN_MEMORY_STAT_DISK_CACHES 10
#endif
+/* 4.10.0 */
+
+#ifndef VIR_DOMAIN_SHUTOFF_DAEMON
+#define VIR_DOMAIN_SHUTOFF_DAEMON 8
+#endif
+
+#ifndef VIR_DOMAIN_STATS_IOTHREAD
+#define VIR_DOMAIN_STATS_IOTHREAD (1 << 7)
+#endif
+
+#ifndef VIR_DOMAIN_IOTHREAD_POLL_GROW
+#define VIR_DOMAIN_IOTHREAD_POLL_GROW "poll_grow"
+#endif
+
+#ifndef VIR_DOMAIN_IOTHREAD_POLL_SHRINK
+#define VIR_DOMAIN_IOTHREAD_POLL_SHRINK "poll_shrink"
+#endif
+
+#ifndef VIR_DOMAIN_IOTHREAD_POLL_MAX_NS
+#define VIR_DOMAIN_IOTHREAD_POLL_MAX_NS "poll_max_ns"
+#endif
+
#endif /* LIBVIRT_GO_DOMAIN_COMPAT_H__ */
diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go
index b42dd42f..f674bd51 100644
--- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go
+++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go
@@ -1914,6 +1914,26 @@ virDomainSetGuestVcpusWrapper(virDomainPtr domain,
int
+virDomainSetIOThreadParamsWrapper(virDomainPtr domain,
+ unsigned int iothread_id,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags,
+ virErrorPtr err)
+{
+#if LIBVIR_VERSION_NUMBER < 4010000
+ assert(0); // Caller should have checked version
+#else
+ int ret = virDomainSetIOThreadParams(domain, iothread_id, params, nparams, flags);
+ if (ret < 0) {
+ virCopyLastError(err);
+ }
+ return ret;
+#endif
+}
+
+
+int
virDomainSetInterfaceParametersWrapper(virDomainPtr domain,
const char *device,
virTypedParameterPtr params,
diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h
index 7bd82826..48a4cd39 100644
--- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h
+++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h
@@ -814,6 +814,14 @@ virDomainSetGuestVcpusWrapper(virDomainPtr domain,
virErrorPtr err);
int
+virDomainSetIOThreadParamsWrapper(virDomainPtr domain,
+ unsigned int iothread_id,
+ virTypedParameterPtr params,
+ int nparams,
+ unsigned int flags,
+ virErrorPtr err);
+
+int
virDomainSetInterfaceParametersWrapper(virDomainPtr domain,
const char *device,
virTypedParameterPtr params,
diff --git a/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go b/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go
index e17a5474..404e10ca 100644
--- a/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go
+++ b/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go
@@ -29,6 +29,15 @@ const (
backgroundMask = (backgroundRed | backgroundBlue | backgroundGreen | backgroundIntensity)
)
+const (
+ genericRead = 0x80000000
+ genericWrite = 0x40000000
+)
+
+const (
+ consoleTextmodeBuffer = 0x1
+)
+
type wchar uint16
type short int16
type dword uint32
@@ -69,14 +78,17 @@ var (
procGetConsoleCursorInfo = kernel32.NewProc("GetConsoleCursorInfo")
procSetConsoleCursorInfo = kernel32.NewProc("SetConsoleCursorInfo")
procSetConsoleTitle = kernel32.NewProc("SetConsoleTitleW")
+ procCreateConsoleScreenBuffer = kernel32.NewProc("CreateConsoleScreenBuffer")
)
// Writer provide colorable Writer to the console
type Writer struct {
- out io.Writer
- handle syscall.Handle
- oldattr word
- oldpos coord
+ out io.Writer
+ handle syscall.Handle
+ althandle syscall.Handle
+ oldattr word
+ oldpos coord
+ rest bytes.Buffer
}
// NewColorable return new instance of Writer which handle escape sequence from File.
@@ -407,7 +419,18 @@ func (w *Writer) Write(data []byte) (n int, err error) {
var csbi consoleScreenBufferInfo
procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
- er := bytes.NewReader(data)
+ handle := w.handle
+
+ var er *bytes.Reader
+ if w.rest.Len() > 0 {
+ var rest bytes.Buffer
+ w.rest.WriteTo(&rest)
+ w.rest.Reset()
+ rest.Write(data)
+ er = bytes.NewReader(rest.Bytes())
+ } else {
+ er = bytes.NewReader(data)
+ }
var bw [1]byte
loop:
for {
@@ -425,29 +448,55 @@ loop:
break loop
}
- if c2 == ']' {
- if err := doTitleSequence(er); err != nil {
+ switch c2 {
+ case '>':
+ continue
+ case ']':
+ w.rest.WriteByte(c1)
+ w.rest.WriteByte(c2)
+ er.WriteTo(&w.rest)
+ if bytes.IndexByte(w.rest.Bytes(), 0x07) == -1 {
break loop
}
+ er = bytes.NewReader(w.rest.Bytes()[2:])
+ err := doTitleSequence(er)
+ if err != nil {
+ break loop
+ }
+ w.rest.Reset()
continue
- }
- if c2 != 0x5b {
+ // https://github.com/mattn/go-colorable/issues/27
+ case '7':
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
+ w.oldpos = csbi.cursorPosition
+ continue
+ case '8':
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&w.oldpos)))
+ continue
+ case 0x5b:
+ // execute part after switch
+ default:
continue
}
+ w.rest.WriteByte(c1)
+ w.rest.WriteByte(c2)
+ er.WriteTo(&w.rest)
+
var buf bytes.Buffer
var m byte
- for {
- c, err := er.ReadByte()
- if err != nil {
- break loop
- }
+ for i, c := range w.rest.Bytes()[2:] {
if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') || c == '@' {
m = c
+ er = bytes.NewReader(w.rest.Bytes()[2+i+1:])
+ w.rest.Reset()
break
}
buf.Write([]byte(string(c)))
}
+ if m == 0 {
+ break loop
+ }
switch m {
case 'A':
@@ -455,61 +504,64 @@ loop:
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.y -= short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'B':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.y += short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'C':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.x += short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'D':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.x -= short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ if csbi.cursorPosition.x < 0 {
+ csbi.cursorPosition.x = 0
+ }
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'E':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.x = 0
csbi.cursorPosition.y += short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'F':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.x = 0
csbi.cursorPosition.y -= short(n)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'G':
n, err = strconv.Atoi(buf.String())
if err != nil {
continue
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
csbi.cursorPosition.x = short(n - 1)
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'H', 'f':
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
if buf.Len() > 0 {
token := strings.Split(buf.String(), ";")
switch len(token) {
@@ -534,7 +586,7 @@ loop:
} else {
csbi.cursorPosition.y = 0
}
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition)))
case 'J':
n := 0
if buf.Len() > 0 {
@@ -545,20 +597,20 @@ loop:
}
var count, written dword
var cursor coord
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
switch n {
case 0:
cursor = coord{x: csbi.cursorPosition.x, y: csbi.cursorPosition.y}
- count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.size.y-csbi.cursorPosition.y)*csbi.size.x)
+ count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.size.y-csbi.cursorPosition.y)*dword(csbi.size.x)
case 1:
cursor = coord{x: csbi.window.left, y: csbi.window.top}
- count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.window.top-csbi.cursorPosition.y)*csbi.size.x)
+ count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.window.top-csbi.cursorPosition.y)*dword(csbi.size.x)
case 2:
cursor = coord{x: csbi.window.left, y: csbi.window.top}
- count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.size.y-csbi.cursorPosition.y)*csbi.size.x)
+ count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.size.y-csbi.cursorPosition.y)*dword(csbi.size.x)
}
- procFillConsoleOutputCharacter.Call(uintptr(w.handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
- procFillConsoleOutputAttribute.Call(uintptr(w.handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
+ procFillConsoleOutputCharacter.Call(uintptr(handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
+ procFillConsoleOutputAttribute.Call(uintptr(handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
case 'K':
n := 0
if buf.Len() > 0 {
@@ -567,28 +619,28 @@ loop:
continue
}
}
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
var cursor coord
var count, written dword
switch n {
case 0:
- cursor = coord{x: csbi.cursorPosition.x + 1, y: csbi.cursorPosition.y}
- count = dword(csbi.size.x - csbi.cursorPosition.x - 1)
+ cursor = coord{x: csbi.cursorPosition.x, y: csbi.cursorPosition.y}
+ count = dword(csbi.size.x - csbi.cursorPosition.x)
case 1:
- cursor = coord{x: csbi.window.left, y: csbi.window.top + csbi.cursorPosition.y}
+ cursor = coord{x: csbi.window.left, y: csbi.cursorPosition.y}
count = dword(csbi.size.x - csbi.cursorPosition.x)
case 2:
- cursor = coord{x: csbi.window.left, y: csbi.window.top + csbi.cursorPosition.y}
+ cursor = coord{x: csbi.window.left, y: csbi.cursorPosition.y}
count = dword(csbi.size.x)
}
- procFillConsoleOutputCharacter.Call(uintptr(w.handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
- procFillConsoleOutputAttribute.Call(uintptr(w.handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
+ procFillConsoleOutputCharacter.Call(uintptr(handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
+ procFillConsoleOutputAttribute.Call(uintptr(handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written)))
case 'm':
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
attr := csbi.attributes
cs := buf.String()
if cs == "" {
- procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(w.oldattr))
+ procSetConsoleTextAttribute.Call(uintptr(handle), uintptr(w.oldattr))
continue
}
token := strings.Split(cs, ";")
@@ -627,6 +679,21 @@ loop:
attr |= n256foreAttr[n256]
i += 2
}
+ } else if len(token) == 5 && token[i+1] == "2" {
+ var r, g, b int
+ r, _ = strconv.Atoi(token[i+2])
+ g, _ = strconv.Atoi(token[i+3])
+ b, _ = strconv.Atoi(token[i+4])
+ i += 4
+ if r > 127 {
+ attr |= foregroundRed
+ }
+ if g > 127 {
+ attr |= foregroundGreen
+ }
+ if b > 127 {
+ attr |= foregroundBlue
+ }
} else {
attr = attr & (w.oldattr & backgroundMask)
}
@@ -654,6 +721,21 @@ loop:
attr |= n256backAttr[n256]
i += 2
}
+ } else if len(token) == 5 && token[i+1] == "2" {
+ var r, g, b int
+ r, _ = strconv.Atoi(token[i+2])
+ g, _ = strconv.Atoi(token[i+3])
+ b, _ = strconv.Atoi(token[i+4])
+ i += 4
+ if r > 127 {
+ attr |= backgroundRed
+ }
+ if g > 127 {
+ attr |= backgroundGreen
+ }
+ if b > 127 {
+ attr |= backgroundBlue
+ }
} else {
attr = attr & (w.oldattr & foregroundMask)
}
@@ -685,38 +767,52 @@ loop:
attr |= backgroundBlue
}
}
- procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(attr))
+ procSetConsoleTextAttribute.Call(uintptr(handle), uintptr(attr))
}
}
case 'h':
var ci consoleCursorInfo
cs := buf.String()
if cs == "5>" {
- procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
ci.visible = 0
- procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
} else if cs == "?25" {
- procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
ci.visible = 1
- procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
+ } else if cs == "?1049" {
+ if w.althandle == 0 {
+ h, _, _ := procCreateConsoleScreenBuffer.Call(uintptr(genericRead|genericWrite), 0, 0, uintptr(consoleTextmodeBuffer), 0, 0)
+ w.althandle = syscall.Handle(h)
+ if w.althandle != 0 {
+ handle = w.althandle
+ }
+ }
}
case 'l':
var ci consoleCursorInfo
cs := buf.String()
if cs == "5>" {
- procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
ci.visible = 1
- procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
} else if cs == "?25" {
- procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
ci.visible = 0
- procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci)))
+ procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci)))
+ } else if cs == "?1049" {
+ if w.althandle != 0 {
+ syscall.CloseHandle(w.althandle)
+ w.althandle = 0
+ handle = w.handle
+ }
}
case 's':
- procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
+ procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
w.oldpos = csbi.cursorPosition
case 'u':
- procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&w.oldpos)))
+ procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&w.oldpos)))
}
}
diff --git a/src/dma/vendor/github.com/mattn/go-colorable/go.mod b/src/dma/vendor/github.com/mattn/go-colorable/go.mod
new file mode 100644
index 00000000..9d9f4248
--- /dev/null
+++ b/src/dma/vendor/github.com/mattn/go-colorable/go.mod
@@ -0,0 +1,3 @@
+module github.com/mattn/go-colorable
+
+require github.com/mattn/go-isatty v0.0.5
diff --git a/src/dma/vendor/github.com/mattn/go-colorable/go.sum b/src/dma/vendor/github.com/mattn/go-colorable/go.sum
new file mode 100644
index 00000000..2c12960e
--- /dev/null
+++ b/src/dma/vendor/github.com/mattn/go-colorable/go.sum
@@ -0,0 +1,4 @@
+github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw=
+github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml b/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml
index b9f8b239..5597e026 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml
+++ b/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml
@@ -2,6 +2,10 @@ language: go
go:
- tip
+os:
+ - linux
+ - osx
+
before_install:
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/go.mod b/src/dma/vendor/github.com/mattn/go-isatty/go.mod
new file mode 100644
index 00000000..f310320c
--- /dev/null
+++ b/src/dma/vendor/github.com/mattn/go-isatty/go.mod
@@ -0,0 +1,3 @@
+module github.com/mattn/go-isatty
+
+require golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/go.sum b/src/dma/vendor/github.com/mattn/go-isatty/go.sum
new file mode 100644
index 00000000..426c8973
--- /dev/null
+++ b/src/dma/vendor/github.com/mattn/go-isatty/go.sum
@@ -0,0 +1,2 @@
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go
index 44e5d213..d3567cb5 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go
+++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go
@@ -1,12 +1,10 @@
-// +build linux
-// +build ppc64 ppc64le
+// +build android
package isatty
import (
+ "syscall"
"unsafe"
-
- syscall "golang.org/x/sys/unix"
)
const ioctlReadTermios = syscall.TCGETS
@@ -17,3 +15,9 @@ func IsTerminal(fd uintptr) bool {
_, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0)
return err == 0
}
+
+// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2
+// terminal. This is also always false on this environment.
+func IsCygwinTerminal(fd uintptr) bool {
+ return false
+}
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go
deleted file mode 100644
index 9584a988..00000000
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// +build appengine
-
-package isatty
-
-// IsTerminal returns true if the file descriptor is terminal which
-// is always false on on appengine classic which is a sandboxed PaaS.
-func IsTerminal(fd uintptr) bool {
- return false
-}
-
-// IsCygwinTerminal() return true if the file descriptor is a cygwin or msys2
-// terminal. This is also always false on this environment.
-func IsCygwinTerminal(fd uintptr) bool {
- return false
-}
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go
index 42f2514d..07e93039 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go
+++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go
@@ -16,3 +16,9 @@ func IsTerminal(fd uintptr) bool {
_, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0)
return err == 0
}
+
+// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2
+// terminal. This is also always false on this environment.
+func IsCygwinTerminal(fd uintptr) bool {
+ return false
+}
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go
index 7384cf99..4f8af465 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go
+++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go
@@ -1,18 +1,19 @@
// +build linux
-// +build !appengine,!ppc64,!ppc64le
+// +build !appengine
+// +build !android
package isatty
-import (
- "syscall"
- "unsafe"
-)
-
-const ioctlReadTermios = syscall.TCGETS
+import "golang.org/x/sys/unix"
// IsTerminal return true if the file descriptor is terminal.
func IsTerminal(fd uintptr) bool {
- var termios syscall.Termios
- _, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0)
- return err == 0
+ _, err := unix.IoctlGetTermios(int(fd), unix.TCGETS)
+ return err == nil
+}
+
+// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2
+// terminal. This is also always false on this environment.
+func IsCygwinTerminal(fd uintptr) bool {
+ return false
}
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go
index ff4de3d9..f02849c5 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go
+++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go
@@ -1,8 +1,13 @@
-// +build !windows
-// +build !appengine
+// +build appengine js
package isatty
+// IsTerminal returns true if the file descriptor is terminal which
+// is always false on js and appengine classic which is a sandboxed PaaS.
+func IsTerminal(fd uintptr) bool {
+ return false
+}
+
// IsCygwinTerminal() return true if the file descriptor is a cygwin or msys2
// terminal. This is also always false on this environment.
func IsCygwinTerminal(fd uintptr) bool {
diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go
index 1f0c6bf5..bdd5c79a 100644
--- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go
+++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go
@@ -14,3 +14,9 @@ func IsTerminal(fd uintptr) bool {
err := unix.IoctlSetTermio(int(fd), unix.TCGETA, &termio)
return err == nil
}
+
+// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2
+// terminal. This is also always false on this environment.
+func IsCygwinTerminal(fd uintptr) bool {
+ return false
+}
diff --git a/src/dma/vendor/github.com/streadway/amqp/.gitignore b/src/dma/vendor/github.com/streadway/amqp/.gitignore
index ba8a7056..667fb50c 100644
--- a/src/dma/vendor/github.com/streadway/amqp/.gitignore
+++ b/src/dma/vendor/github.com/streadway/amqp/.gitignore
@@ -2,3 +2,11 @@ certs/*
spec/spec
examples/simple-consumer/simple-consumer
examples/simple-producer/simple-producer
+
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+.idea/**/contentModel.xml
diff --git a/src/dma/vendor/github.com/streadway/amqp/.travis.yml b/src/dma/vendor/github.com/streadway/amqp/.travis.yml
index 7166964c..2d22a7af 100644
--- a/src/dma/vendor/github.com/streadway/amqp/.travis.yml
+++ b/src/dma/vendor/github.com/streadway/amqp/.travis.yml
@@ -1,17 +1,18 @@
language: go
go:
- - 1.9.x
- 1.10.x
+ - 1.11.x
+ - 1.12.x
services:
- rabbitmq
env:
- - AMQP_URL=amqp://guest:guest@127.0.0.1:5672/
+ - GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/
before_install:
- - go get -v github.com/golang/lint/golint
+ - go get -v golang.org/x/lint/golint
script:
- ./pre-commit
diff --git a/src/dma/vendor/github.com/streadway/amqp/LICENSE b/src/dma/vendor/github.com/streadway/amqp/LICENSE
index 243c0ce7..07b89680 100644
--- a/src/dma/vendor/github.com/streadway/amqp/LICENSE
+++ b/src/dma/vendor/github.com/streadway/amqp/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+Copyright (c) 2012-2019, Sean Treadway, SoundCloud Ltd.
All rights reserved.
Redistribution and use in source and binary forms, with or without
diff --git a/src/dma/vendor/github.com/streadway/amqp/README.md b/src/dma/vendor/github.com/streadway/amqp/README.md
index 099db276..287830b2 100644
--- a/src/dma/vendor/github.com/streadway/amqp/README.md
+++ b/src/dma/vendor/github.com/streadway/amqp/README.md
@@ -15,7 +15,7 @@ enhancements.
## Supported Go Versions
-This library supports two most recent Go release series, currently 1.8 and 1.9.
+This library supports two most recent Go release series, currently 1.10 and 1.11.
## Supported RabbitMQ Versions
diff --git a/src/dma/vendor/github.com/streadway/amqp/auth.go b/src/dma/vendor/github.com/streadway/amqp/auth.go
index ebc765b6..435c94b1 100644
--- a/src/dma/vendor/github.com/streadway/amqp/auth.go
+++ b/src/dma/vendor/github.com/streadway/amqp/auth.go
@@ -32,6 +32,22 @@ func (auth *PlainAuth) Response() string {
return fmt.Sprintf("\000%s\000%s", auth.Username, auth.Password)
}
+// AMQPlainAuth is similar to PlainAuth
+type AMQPlainAuth struct {
+ Username string
+ Password string
+}
+
+// Mechanism returns "AMQPLAIN"
+func (auth *AMQPlainAuth) Mechanism() string {
+ return "AMQPLAIN"
+}
+
+// Response returns the null character delimited encoding for the SASL PLAIN Mechanism.
+func (auth *AMQPlainAuth) Response() string {
+ return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password)
+}
+
// Finds the first mechanism preferred by the client that the server supports.
func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) {
for _, auth = range client {
diff --git a/src/dma/vendor/github.com/streadway/amqp/channel.go b/src/dma/vendor/github.com/streadway/amqp/channel.go
index dd2552ca..3898ed78 100644
--- a/src/dma/vendor/github.com/streadway/amqp/channel.go
+++ b/src/dma/vendor/github.com/streadway/amqp/channel.go
@@ -889,7 +889,7 @@ and exchanges will also be restored on server restart.
If the binding could not complete, an error will be returned and the channel
will be closed.
-When noWait is true and the queue could not be bound, the channel will be
+When noWait is false and the queue could not be bound, the channel will be
closed with an error.
*/
@@ -1580,6 +1580,9 @@ multiple messages, reducing the amount of protocol messages to exchange.
See also Delivery.Reject
*/
func (ch *Channel) Reject(tag uint64, requeue bool) error {
+ ch.m.Lock()
+ defer ch.m.Unlock()
+
return ch.send(&basicReject{
DeliveryTag: tag,
Requeue: requeue,
diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go
index ca1372d0..b9d8e8ee 100644
--- a/src/dma/vendor/github.com/streadway/amqp/connection.go
+++ b/src/dma/vendor/github.com/streadway/amqp/connection.go
@@ -111,21 +111,23 @@ type readDeadliner interface {
SetReadDeadline(time.Time) error
}
-// defaultDial establishes a connection when config.Dial is not provided
-func defaultDial(network, addr string) (net.Conn, error) {
- conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
- if err != nil {
- return nil, err
- }
+// DefaultDial establishes a connection when config.Dial is not provided
+func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
+ return func(network, addr string) (net.Conn, error) {
+ conn, err := net.DialTimeout(network, addr, connectionTimeout)
+ if err != nil {
+ return nil, err
+ }
- // Heartbeating hasn't started yet, don't stall forever on a dead server.
- // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
- // the deadline is cleared in openComplete.
- if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
- return nil, err
- }
+ // Heartbeating hasn't started yet, don't stall forever on a dead server.
+ // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
+ // the deadline is cleared in openComplete.
+ if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
+ return nil, err
+ }
- return conn, nil
+ return conn, nil
+ }
}
// Dial accepts a string in the AMQP URI format and returns a new Connection
@@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
dialer := config.Dial
if dialer == nil {
- dialer = defaultDial
+ dialer = DefaultDial(defaultConnectionTimeout)
}
conn, err = dialer("tcp", addr)
@@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
client := tls.Client(conn, config.TLSClientConfig)
if err := client.Handshake(); err != nil {
+
conn.Close()
return nil, err
}
@@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers
will also be closed.
*/
func (c *Connection) Close() error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -332,7 +335,7 @@ func (c *Connection) Close() error {
}
func (c *Connection) closeWith(err *Error) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error {
)
}
-func (c *Connection) isClosed() bool {
+// IsClosed returns true if the connection is marked as closed, otherwise false
+// is returned.
+func (c *Connection) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
func (c *Connection) send(f frame) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) {
c.m.Lock()
defer c.m.Unlock()
- if c.isClosed() {
+ if c.IsClosed() {
return nil, ErrClosed
}
diff --git a/src/dma/vendor/github.com/streadway/amqp/delivery.go b/src/dma/vendor/github.com/streadway/amqp/delivery.go
index 304c8346..72412644 100644
--- a/src/dma/vendor/github.com/streadway/amqp/delivery.go
+++ b/src/dma/vendor/github.com/streadway/amqp/delivery.go
@@ -52,7 +52,7 @@ type Delivery struct {
DeliveryTag uint64
Redelivered bool
- Exchange string // basic.publish exhange
+ Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
Body []byte
diff --git a/src/dma/vendor/github.com/streadway/amqp/go.mod b/src/dma/vendor/github.com/streadway/amqp/go.mod
new file mode 100644
index 00000000..4eeab334
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/go.mod
@@ -0,0 +1,3 @@
+module github.com/streadway/amqp
+
+go 1.10
diff --git a/src/dma/vendor/github.com/streadway/amqp/pre-commit b/src/dma/vendor/github.com/streadway/amqp/pre-commit
index 7607f467..37155300 100755
--- a/src/dma/vendor/github.com/streadway/amqp/pre-commit
+++ b/src/dma/vendor/github.com/streadway/amqp/pre-commit
@@ -1,29 +1,67 @@
#!/bin/sh
-GOFMT_FILES=$(gofmt -l .)
-if [ -n "${GOFMT_FILES}" ]; then
- printf >&2 'gofmt failed for the following files:\n%s\n\nplease run "gofmt -w ." on your changes before committing.\n' "${GOFMT_FILES}"
- exit 1
-fi
-
-GOLINT_ERRORS=$(golint ./... | grep -v "Id should be")
-if [ -n "${GOLINT_ERRORS}" ]; then
- printf >&2 'golint failed for the following reasons:\n%s\n\nplease run 'golint ./...' on your changes before committing.\n' "${GOLINT_ERRORS}"
- exit 1
-fi
-
-GOVET_ERRORS=$(go tool vet *.go 2>&1)
-if [ -n "${GOVET_ERRORS}" ]; then
- printf >&2 'go vet failed for the following reasons:\n%s\n\nplease run "go tool vet *.go" on your changes before committing.\n' "${GOVET_ERRORS}"
- exit 1
-fi
-
-if [ -z "${NOTEST}" ]; then
- printf >&2 'Running short tests...\n'
- env AMQP_URL= go test -short -v | egrep 'PASS|ok'
-
- if [ $? -ne 0 ]; then
- printf >&2 'go test failed, please fix before committing.\n'
+LATEST_STABLE_SUPPORTED_GO_VERSION="1.11"
+
+main() {
+ if local_go_version_is_latest_stable
+ then
+ run_gofmt
+ run_golint
+ run_govet
+ fi
+ run_unit_tests
+}
+
+local_go_version_is_latest_stable() {
+ go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION
+}
+
+log_error() {
+ echo "$*" 1>&2
+}
+
+run_gofmt() {
+ GOFMT_FILES=$(gofmt -l .)
+ if [ -n "$GOFMT_FILES" ]
+ then
+ log_error "gofmt failed for the following files:
+$GOFMT_FILES
+
+please run 'gofmt -w .' on your changes before committing."
exit 1
fi
-fi
+}
+
+run_golint() {
+ GOLINT_ERRORS=$(golint ./... | grep -v "Id should be")
+ if [ -n "$GOLINT_ERRORS" ]
+ then
+ log_error "golint failed for the following reasons:
+$GOLINT_ERRORS
+
+please run 'golint ./...' on your changes before committing."
+ exit 1
+ fi
+}
+
+run_govet() {
+ GOVET_ERRORS=$(go tool vet ./*.go 2>&1)
+ if [ -n "$GOVET_ERRORS" ]
+ then
+ log_error "go vet failed for the following reasons:
+$GOVET_ERRORS
+
+please run 'go tool vet ./*.go' on your changes before committing."
+ exit 1
+ fi
+}
+
+run_unit_tests() {
+ if [ -z "$NOTEST" ]
+ then
+ log_error 'Running short tests...'
+ env AMQP_URL= go test -short
+ fi
+}
+
+main
diff --git a/src/dma/vendor/github.com/streadway/amqp/types.go b/src/dma/vendor/github.com/streadway/amqp/types.go
index ff5ea3cb..d3ece707 100644
--- a/src/dma/vendor/github.com/streadway/amqp/types.go
+++ b/src/dma/vendor/github.com/streadway/amqp/types.go
@@ -200,6 +200,7 @@ type Decimal struct {
// byte
// float32
// float64
+// int
// int16
// int32
// int64
@@ -225,7 +226,7 @@ type Table map[string]interface{}
func validateField(f interface{}) error {
switch fv := f.(type) {
- case nil, bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
+ case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
return nil
case []interface{}:
diff --git a/src/dma/vendor/github.com/streadway/amqp/uri.go b/src/dma/vendor/github.com/streadway/amqp/uri.go
index 35fefdc2..e5847154 100644
--- a/src/dma/vendor/github.com/streadway/amqp/uri.go
+++ b/src/dma/vendor/github.com/streadway/amqp/uri.go
@@ -125,6 +125,15 @@ func (uri URI) PlainAuth() *PlainAuth {
}
}
+// AMQPlainAuth returns a PlainAuth structure based on the parsed URI's
+// Username and Password fields.
+func (uri URI) AMQPlainAuth() *AMQPlainAuth {
+ return &AMQPlainAuth{
+ Username: uri.Username,
+ Password: uri.Password,
+ }
+}
+
func (uri URI) String() string {
authority, err := url.Parse("")
if err != nil {
diff --git a/src/dma/vendor/github.com/streadway/amqp/write.go b/src/dma/vendor/github.com/streadway/amqp/write.go
index 58ed20d6..94a46d11 100644
--- a/src/dma/vendor/github.com/streadway/amqp/write.go
+++ b/src/dma/vendor/github.com/streadway/amqp/write.go
@@ -308,6 +308,11 @@ func writeField(w io.Writer, value interface{}) (err error) {
binary.BigEndian.PutUint16(buf[1:3], uint16(v))
enc = buf[:3]
+ case int:
+ buf[0] = 'I'
+ binary.BigEndian.PutUint32(buf[1:5], uint32(v))
+ enc = buf[:5]
+
case int32:
buf[0] = 'I'
binary.BigEndian.PutUint32(buf[1:5], uint32(v))
diff --git a/src/dma/vendor/github.com/valyala/fasttemplate/go.mod b/src/dma/vendor/github.com/valyala/fasttemplate/go.mod
new file mode 100644
index 00000000..6015c4b5
--- /dev/null
+++ b/src/dma/vendor/github.com/valyala/fasttemplate/go.mod
@@ -0,0 +1,3 @@
+module github.com/valyala/fasttemplate
+
+require github.com/valyala/bytebufferpool v1.0.0
diff --git a/src/dma/vendor/github.com/valyala/fasttemplate/go.sum b/src/dma/vendor/github.com/valyala/fasttemplate/go.sum
new file mode 100644
index 00000000..c10c48c2
--- /dev/null
+++ b/src/dma/vendor/github.com/valyala/fasttemplate/go.sum
@@ -0,0 +1,2 @@
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=