From 4d11ca17d0f73f5bd783f45900118295fdfed46b Mon Sep 17 00:00:00 2001 From: Tomofumi Hayashi Date: Sat, 27 Apr 2019 20:38:39 +0900 Subject: barometer: update DMA's vendoring packages Change-Id: I0578b094f1ecdaed20c906be2ba29d51b8089d7c Signed-off-by: Tomofumi Hayashi --- src/dma/vendor/github.com/BurntSushi/toml/COPYING | 27 +- .../BurntSushi/toml/cmd/toml-test-decoder/COPYING | 27 +- .../BurntSushi/toml/cmd/toml-test-encoder/COPYING | 27 +- .../github.com/BurntSushi/toml/cmd/tomlv/COPYING | 27 +- src/dma/vendor/github.com/BurntSushi/toml/lex.go | 2 +- .../vendor/github.com/go-redis/redis/.travis.yml | 3 +- .../vendor/github.com/go-redis/redis/CHANGELOG.md | 13 + src/dma/vendor/github.com/go-redis/redis/Makefile | 4 +- .../vendor/github.com/go-redis/redis/cluster.go | 499 +++++------ .../vendor/github.com/go-redis/redis/command.go | 993 ++++++++++++++++++--- .../vendor/github.com/go-redis/redis/commands.go | 380 ++++++-- .../github.com/go-redis/redis/internal/error.go | 30 +- .../go-redis/redis/internal/pool/conn.go | 51 +- .../go-redis/redis/internal/pool/pool.go | 132 ++- .../go-redis/redis/internal/proto/reader.go | 162 ++-- .../go-redis/redis/internal/proto/write_buffer.go | 113 --- .../go-redis/redis/internal/proto/writer.go | 159 ++++ .../redis/internal/singleflight/singleflight.go | 64 -- .../go-redis/redis/internal/util/safe.go | 4 + .../go-redis/redis/internal/util/unsafe.go | 10 + .../vendor/github.com/go-redis/redis/options.go | 27 +- src/dma/vendor/github.com/go-redis/redis/parser.go | 394 -------- .../vendor/github.com/go-redis/redis/pipeline.go | 7 + src/dma/vendor/github.com/go-redis/redis/pubsub.go | 127 +-- src/dma/vendor/github.com/go-redis/redis/redis.go | 173 +++- src/dma/vendor/github.com/go-redis/redis/result.go | 2 +- src/dma/vendor/github.com/go-redis/redis/ring.go | 106 ++- .../vendor/github.com/go-redis/redis/sentinel.go | 252 ++++-- src/dma/vendor/github.com/go-redis/redis/tx.go | 4 +- .../vendor/github.com/go-redis/redis/universal.go | 102 ++- .../vendor/github.com/labstack/echo/.travis.yml | 10 +- src/dma/vendor/github.com/labstack/echo/Gopkg.lock | 75 -- src/dma/vendor/github.com/labstack/echo/Gopkg.toml | 42 - src/dma/vendor/github.com/labstack/echo/Makefile | 14 - src/dma/vendor/github.com/labstack/echo/README.md | 2 +- src/dma/vendor/github.com/labstack/echo/bind.go | 48 +- src/dma/vendor/github.com/labstack/echo/context.go | 90 +- src/dma/vendor/github.com/labstack/echo/echo.go | 113 +-- src/dma/vendor/github.com/labstack/echo/group.go | 19 +- src/dma/vendor/github.com/labstack/echo/log.go | 1 + src/dma/vendor/github.com/labstack/echo/router.go | 47 +- src/dma/vendor/github.com/labstack/gommon/LICENSE | 2 +- .../vendor/github.com/labstack/gommon/log/log.go | 2 +- .../vendor/github.com/libvirt/libvirt-go/domain.go | 53 ++ .../github.com/libvirt/libvirt-go/domain_compat.h | 24 +- .../libvirt/libvirt-go/domain_wrapper.go | 20 + .../github.com/libvirt/libvirt-go/domain_wrapper.h | 8 + .../mattn/go-colorable/colorable_windows.go | 208 +++-- .../vendor/github.com/mattn/go-colorable/go.mod | 3 + .../vendor/github.com/mattn/go-colorable/go.sum | 4 + .../vendor/github.com/mattn/go-isatty/.travis.yml | 4 + src/dma/vendor/github.com/mattn/go-isatty/go.mod | 3 + src/dma/vendor/github.com/mattn/go-isatty/go.sum | 2 + .../github.com/mattn/go-isatty/isatty_android.go | 23 + .../github.com/mattn/go-isatty/isatty_appengine.go | 15 - .../github.com/mattn/go-isatty/isatty_bsd.go | 6 + .../github.com/mattn/go-isatty/isatty_linux.go | 21 +- .../mattn/go-isatty/isatty_linux_ppc64x.go | 19 - .../github.com/mattn/go-isatty/isatty_others.go | 9 +- .../github.com/mattn/go-isatty/isatty_solaris.go | 6 + .../vendor/github.com/streadway/amqp/.gitignore | 8 + .../vendor/github.com/streadway/amqp/.travis.yml | 7 +- src/dma/vendor/github.com/streadway/amqp/LICENSE | 2 +- src/dma/vendor/github.com/streadway/amqp/README.md | 2 +- src/dma/vendor/github.com/streadway/amqp/auth.go | 16 + .../vendor/github.com/streadway/amqp/channel.go | 5 +- .../vendor/github.com/streadway/amqp/connection.go | 43 +- .../vendor/github.com/streadway/amqp/delivery.go | 2 +- src/dma/vendor/github.com/streadway/amqp/go.mod | 3 + .../vendor/github.com/streadway/amqp/pre-commit | 88 +- src/dma/vendor/github.com/streadway/amqp/types.go | 3 +- src/dma/vendor/github.com/streadway/amqp/uri.go | 9 + src/dma/vendor/github.com/streadway/amqp/write.go | 5 + .../vendor/github.com/valyala/fasttemplate/go.mod | 3 + .../vendor/github.com/valyala/fasttemplate/go.sum | 2 + 75 files changed, 3068 insertions(+), 1944 deletions(-) delete mode 100644 src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go create mode 100644 src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go delete mode 100644 src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go delete mode 100644 src/dma/vendor/github.com/go-redis/redis/parser.go delete mode 100644 src/dma/vendor/github.com/labstack/echo/Gopkg.lock delete mode 100644 src/dma/vendor/github.com/labstack/echo/Gopkg.toml create mode 100644 src/dma/vendor/github.com/mattn/go-colorable/go.mod create mode 100644 src/dma/vendor/github.com/mattn/go-colorable/go.sum create mode 100644 src/dma/vendor/github.com/mattn/go-isatty/go.mod create mode 100644 src/dma/vendor/github.com/mattn/go-isatty/go.sum create mode 100644 src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go delete mode 100644 src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go delete mode 100644 src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go create mode 100644 src/dma/vendor/github.com/streadway/amqp/go.mod create mode 100644 src/dma/vendor/github.com/valyala/fasttemplate/go.mod create mode 100644 src/dma/vendor/github.com/valyala/fasttemplate/go.sum (limited to 'src/dma/vendor/github.com') diff --git a/src/dma/vendor/github.com/BurntSushi/toml/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/COPYING index 5a8e3325..01b57432 100644 --- a/src/dma/vendor/github.com/BurntSushi/toml/COPYING +++ b/src/dma/vendor/github.com/BurntSushi/toml/COPYING @@ -1,14 +1,21 @@ - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - Version 2, December 2004 +The MIT License (MIT) - Copyright (C) 2004 Sam Hocevar +Copyright (c) 2013 TOML authors - Everyone is permitted to copy and distribute verbatim or modified - copies of this license document, and changing it is allowed as long - as the name is changed. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION - - 0. You just DO WHAT THE FUCK YOU WANT TO. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING index 5a8e3325..01b57432 100644 --- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING +++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-decoder/COPYING @@ -1,14 +1,21 @@ - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - Version 2, December 2004 +The MIT License (MIT) - Copyright (C) 2004 Sam Hocevar +Copyright (c) 2013 TOML authors - Everyone is permitted to copy and distribute verbatim or modified - copies of this license document, and changing it is allowed as long - as the name is changed. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION - - 0. You just DO WHAT THE FUCK YOU WANT TO. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING index 5a8e3325..01b57432 100644 --- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING +++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/toml-test-encoder/COPYING @@ -1,14 +1,21 @@ - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - Version 2, December 2004 +The MIT License (MIT) - Copyright (C) 2004 Sam Hocevar +Copyright (c) 2013 TOML authors - Everyone is permitted to copy and distribute verbatim or modified - copies of this license document, and changing it is allowed as long - as the name is changed. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION - - 0. You just DO WHAT THE FUCK YOU WANT TO. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING b/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING index 5a8e3325..01b57432 100644 --- a/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING +++ b/src/dma/vendor/github.com/BurntSushi/toml/cmd/tomlv/COPYING @@ -1,14 +1,21 @@ - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - Version 2, December 2004 +The MIT License (MIT) - Copyright (C) 2004 Sam Hocevar +Copyright (c) 2013 TOML authors - Everyone is permitted to copy and distribute verbatim or modified - copies of this license document, and changing it is allowed as long - as the name is changed. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: - DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE - TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION - - 0. You just DO WHAT THE FUCK YOU WANT TO. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/dma/vendor/github.com/BurntSushi/toml/lex.go b/src/dma/vendor/github.com/BurntSushi/toml/lex.go index 6dee7fc7..e0a742a8 100644 --- a/src/dma/vendor/github.com/BurntSushi/toml/lex.go +++ b/src/dma/vendor/github.com/BurntSushi/toml/lex.go @@ -775,7 +775,7 @@ func lexDatetime(lx *lexer) stateFn { return lexDatetime } switch r { - case '-', 'T', ':', '.', 'Z': + case '-', 'T', ':', '.', 'Z', '+': return lexDatetime } diff --git a/src/dma/vendor/github.com/go-redis/redis/.travis.yml b/src/dma/vendor/github.com/go-redis/redis/.travis.yml index 39ffc2be..6b110b4c 100644 --- a/src/dma/vendor/github.com/go-redis/redis/.travis.yml +++ b/src/dma/vendor/github.com/go-redis/redis/.travis.yml @@ -5,10 +5,9 @@ services: - redis-server go: - - 1.7.x - - 1.8.x - 1.9.x - 1.10.x + - 1.11.x - tip matrix: diff --git a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md index cb0e1b8e..19645661 100644 --- a/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md +++ b/src/dma/vendor/github.com/go-redis/redis/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## Unreleased + +- Cluster and Ring pipelines process commands for each node in its own goroutine. + +## 6.14 + +- Added Options.MinIdleConns. +- Added Options.MaxConnAge. +- PoolStats.FreeConns is renamed to PoolStats.IdleConns. +- Add Client.Do to simplify creating custom commands. +- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. +- Lower memory usage. + ## v6.13 - Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. diff --git a/src/dma/vendor/github.com/go-redis/redis/Makefile b/src/dma/vendor/github.com/go-redis/redis/Makefile index 1fbdac91..fa3b4e00 100644 --- a/src/dma/vendor/github.com/go-redis/redis/Makefile +++ b/src/dma/vendor/github.com/go-redis/redis/Makefile @@ -3,6 +3,8 @@ all: testdeps go test ./... -short -race env GOOS=linux GOARCH=386 go test ./... go vet + go get github.com/gordonklaus/ineffassign + ineffassign . testdeps: testdata/redis/src/redis-server @@ -13,7 +15,7 @@ bench: testdeps testdata/redis: mkdir -p $@ - wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@ + wget -qO- https://github.com/antirez/redis/archive/5.0.tar.gz | tar xvz --strip-components=1 -C $@ testdata/redis/src/redis-server: testdata/redis sed -i.bak 's/libjemalloc.a/libjemalloc.a -lrt/g' $ 0 { @@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) { n := rand.Intn(len(nodes)-1) + 1 slave = nodes[n] if !slave.Loading() { - break + return slave, nil } } - return slave, nil + + // All slaves are loading - use master. + return nodes[0], nil } } @@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode { return nil } -func (c *clusterState) IsConsistent() bool { - if c.nodes.opt.ClusterSlots != nil { - return true - } - return len(c.Masters) <= len(c.Slaves) -} - //------------------------------------------------------------------------------ type clusterStateHolder struct { load func() (*clusterState, error) - state atomic.Value - - firstErrMu sync.RWMutex - firstErr error - + state atomic.Value reloading uint32 // atomic } @@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder } func (c *clusterStateHolder) Reload() (*clusterState, error) { - state, err := c.reload() - if err != nil { - return nil, err - } - if !state.IsConsistent() { - time.AfterFunc(time.Second, c.LazyReload) - } - return state, nil -} - -func (c *clusterStateHolder) reload() (*clusterState, error) { state, err := c.load() if err != nil { - c.firstErrMu.Lock() - if c.firstErr == nil { - c.firstErr = err - } - c.firstErrMu.Unlock() return nil, err } c.state.Store(state) @@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() { go func() { defer atomic.StoreUint32(&c.reloading, 0) - for { - state, err := c.reload() - if err != nil { - return - } - time.Sleep(100 * time.Millisecond) - if state.IsConsistent() { - return - } + _, err := c.Reload() + if err != nil { + return } + time.Sleep(100 * time.Millisecond) }() } @@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) { } return state, nil } - - c.firstErrMu.RLock() - err := c.firstErr - c.firstErrMu.RUnlock() - if err != nil { - return nil, err - } - - return nil, errors.New("redis: cluster has no state") + return c.Reload() } func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) { @@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c.processTxPipeline = c.defaultProcessTxPipeline c.init() - - _, _ = c.state.Reload() - _, _ = c.cmdsInfoCache.Get() - if opt.IdleCheckFrequency > 0 { go c.reaper(opt.IdleCheckFrequency) } @@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } -// ReloadState reloads cluster state. It calls ClusterSlots func +func (c *ClusterClient) init() { + c.cmdable.setProcessor(c.Process) +} + +// ReloadState reloads cluster state. If available it calls ClusterSlots func // to get cluster slots information. func (c *ClusterClient) ReloadState() error { _, err := c.state.Reload() return err } -func (c *ClusterClient) init() { - c.cmdable.setProcessor(c.Process) -} - func (c *ClusterClient) Context() context.Context { if c.ctx != nil { return c.ctx @@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int { } func (c *ClusterClient) cmdSlot(cmd Cmder) int { + args := cmd.Args() + if args[0] == "cluster" && args[1] == "getkeysinslot" { + return args[2].(int) + } + cmdInfo := c.cmdInfo(cmd.Name()) return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } @@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) { } cmdInfo := c.cmdInfo(cmd.Name()) - slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) + slot := c.cmdSlot(cmd) - if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { + if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly { if c.opt.RouteByLatency { node, err := state.slotClosestNode(slot) return slot, node, err @@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { if err == nil { break } - - if internal.IsRetryableError(err, true) { + if err != Nil { c.state.LazyReload() - continue } moved, ask, addr := internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() node, err = c.nodes.GetOrCreate(addr) if err != nil { return err @@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node, err = c.slotMasterNode(slot) if err != nil { return err @@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } + if internal.IsRetryableError(err, true) { + continue + } + return err } @@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error { return c.nodes.Close() } +// Do creates a Cmd from the args and processes the cmd. +func (c *ClusterClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + func (c *ClusterClient) WrapProcess( fn func(oldProcess func(Cmder) error) func(Cmder) error, ) { @@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { if err == nil { break } + if err != Nil { + c.state.LazyReload() + } - // If slave is loading - read from master. + // If slave is loading - pick another node. if c.opt.ReadOnly && internal.IsLoadingError(err) { node.MarkAsLoading() - continue - } - - if internal.IsRetryableError(err, true) { - c.state.LazyReload() - - // First retry the same node. - if attempt == 0 { - continue - } - - // Second try random node. - node, err = c.nodes.Random() - if err != nil { - break - } + node = nil continue } @@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { - c.state.LazyReload() - node, err = c.nodes.GetOrCreate(addr) if err != nil { break @@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { continue } - if err == pool.ErrClosed { + if err == pool.ErrClosed || internal.IsReadOnlyError(err) { node = nil continue } + if internal.IsRetryableError(err, true) { + // First retry the same node. + if attempt == 0 { + continue + } + + // Second try random node. + node, err = c.nodes.Random() + if err != nil { + break + } + continue + } + break } @@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns acc.StaleConns += s.StaleConns } @@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline( } func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { - cmdsMap, err := c.mapCmdsByNode(cmds) + cmdsMap := newCmdsMap() + err := c.mapCmdsByNode(cmds, cmdsMap) if err != nil { setCmdsErr(cmds, err) return err @@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup - for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + for node, cmds := range cmdsMap.m { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } cmdsMap = failedCmds } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) { +type cmdsMap struct { + mu sync.Mutex + m map[*clusterNode][]Cmder +} + +func newCmdsMap() *cmdsMap { + return &cmdsMap{ + m: make(map[*clusterNode][]Cmder), + } +} + +func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error { state, err := c.state.Get() if err != nil { setCmdsErr(cmds, err) - return nil, err + return err } - cmdsMap := make(map[*clusterNode][]Cmder) cmdsAreReadOnly := c.cmdsAreReadOnly(cmds) for _, cmd := range cmds { var node *clusterNode @@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e node, err = state.slotMasterNode(slot) } if err != nil { - return nil, err + return err } - cmdsMap[node] = append(cmdsMap[node], cmd) + cmdsMap.mu.Lock() + cmdsMap.m[node] = append(cmdsMap.m[node], cmd) + cmdsMap.mu.Unlock() } - return cmdsMap, nil + return nil } func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { @@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool { return true } -func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { - remappedCmds, err := c.mapCmdsByNode(cmds) - if err != nil { - setCmdsErr(cmds, err) - return - } - - for node, cmds := range remappedCmds { - failedCmds[node] = cmds - } -} - func (c *ClusterClient) pipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - - err := writeCmd(cn, cmds...) + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) if err != nil { setCmdsErr(cmds, err) - failedCmds[node] = cmds + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - return c.pipelineReadCmds(cn, cmds, failedCmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return c.pipelineReadCmds(node, rd, cmds, failedCmds) + }) + return err } func (c *ClusterClient) pipelineReadCmds( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { + var firstErr error for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err == nil { continue } @@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds( continue } - return err + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() + if firstErr == nil { + firstErr = err + } } - return nil + return firstErr } func (c *ClusterClient) checkMovedErr( - cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder, + cmd Cmder, err error, failedCmds *cmdsMap, ) bool { moved, ask, addr := internal.IsMovedError(err) @@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], cmd) + failedCmds.mu.Unlock() return true } @@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr( return false } - failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Lock() + failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd) + failedCmds.mu.Unlock() return true } @@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } - failedCmds := make(map[*clusterNode][]Cmder) + failedCmds := newCmdsMap() + var wg sync.WaitGroup for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.mapCmdsByNode(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } + return } - continue - } - err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) + node.Client.releaseConnStrict(cn, err) + }(node, cmds) } - if len(failedCmds) == 0 { + wg.Wait() + if len(failedCmds.m) == 0 { break } - cmdsMap = failedCmds + cmdsMap = failedCmds.m } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { @@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } func (c *ClusterClient) txPipelineProcessCmds( - node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { - setCmdsErr(cmds, err) - failedCmds[node] = cmds - return err - } - - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) + failedCmds.mu.Lock() + failedCmds.m[node] = cmds + failedCmds.mu.Unlock() return err } - return pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := c.txPipelineReadQueued(rd, cmds, failedCmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return err } func (c *ClusterClient) txPipelineReadQueued( - cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, + rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { - err := statusCmd.readReply(cn) + err := statusCmd.readReply(rd) if err == nil { continue } @@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued( } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued( return nil } -func (c *ClusterClient) pubSub(channels []string) *PubSub { +func (c *ClusterClient) pubSub() *PubSub { var node *clusterNode pubsub := &PubSub{ opt: c.opt.clientOptions(), newConn: func(channels []string) (*pool.Conn, error) { - if node == nil { - var slot int - if len(channels) > 0 { - slot = hashtag.Slot(channels[0]) - } else { - slot = -1 - } + if node != nil { + panic("node != nil") + } - masterNode, err := c.slotMasterNode(slot) - if err != nil { - return nil, err - } - node = masterNode + slot := hashtag.Slot(channels[0]) + + var err error + node, err = c.slotMasterNode(slot) + if err != nil { + return nil, err } - return node.Client.newConn() + + cn, err := node.Client.newConn() + if err != nil { + return nil, err + } + + return cn, nil }, closeConn: func(cn *pool.Conn) error { - return node.Client.connPool.CloseConn(cn) + err := node.Client.connPool.CloseConn(cn) + node = nil + return err }, } pubsub.init() + return pubsub } // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. func (c *ClusterClient) Subscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.Subscribe(channels...) } @@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub { // PSubscribe subscribes the client to the given patterns. // Patterns can be omitted to create empty subscription. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { - pubsub := c.pubSub(channels) + pubsub := c.pubSub() if len(channels) > 0 { _ = pubsub.PSubscribe(channels...) } return pubsub } -func useOriginAddr(originAddr, nodeAddr string) bool { - nodeHost, nodePort, err := net.SplitHostPort(nodeAddr) - if err != nil { - return false - } - - nodeIP := net.ParseIP(nodeHost) - if nodeIP == nil { - return false - } - - if !nodeIP.IsLoopback() { - return false - } - - _, originPort, err := net.SplitHostPort(originAddr) - if err != nil { - return false - } - - return nodePort == originPort -} - -func isLoopbackAddr(addr string) bool { - host, _, err := net.SplitHostPort(addr) - if err != nil { - return false - } - - ip := net.ParseIP(host) - if ip == nil { - return false - } - - return ip.IsLoopback() -} - func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { for _, n := range nodes { if n == node { diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go index 11472bec..cb4f94b1 100644 --- a/src/dma/vendor/github.com/go-redis/redis/command.go +++ b/src/dma/vendor/github.com/go-redis/redis/command.go @@ -1,16 +1,14 @@ package redis import ( - "bytes" "fmt" + "net" "strconv" "strings" "time" "github.com/go-redis/redis/internal" - "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/util" ) type Cmder interface { @@ -18,13 +16,12 @@ type Cmder interface { Args() []interface{} stringArg(int) string - readReply(*pool.Conn) error + readReply(rd *proto.Reader) error setErr(error) readTimeout() *time.Duration Err() error - fmt.Stringer } func setCmdsErr(cmds []Cmder, e error) { @@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) { } } -func firstCmdsErr(cmds []Cmder) error { +func cmdsFirstErr(cmds []Cmder) error { for _, cmd := range cmds { if err := cmd.Err(); err != nil { return err @@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error { return nil } -func writeCmd(cn *pool.Conn, cmds ...Cmder) error { - cn.Wb.Reset() +func writeCmd(wr *proto.Writer, cmds ...Cmder) error { for _, cmd := range cmds { - if err := cn.Wb.Append(cmd.Args()); err != nil { + err := wr.WriteArgs(cmd.Args()) + if err != nil { return err } } - - _, err := cn.Write(cn.Wb.Bytes()) - return err + return nil } func cmdString(cmd Cmder, val interface{}) string { @@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) { return cmd.val, cmd.err } -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } } -func (cmd *Cmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser) +func (cmd *Cmd) Int() (int, error) { if cmd.err != nil { - return cmd.err + return 0, cmd.err } - if b, ok := cmd.val.([]byte); ok { - // Bytes must be copied, because underlying memory is reused. - cmd.val = string(b) + switch val := cmd.val.(type) { + case int64: + return int(val), nil + case string: + return strconv.Atoi(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int", val) + return 0, err } - return nil +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err + } +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } +} + +func (cmd *Cmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) + return cmd.err +} + +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case string: + vals = append(vals, v) + default: + vals = append(vals, v) + } + } + return vals, nil } //------------------------------------------------------------------------------ @@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) readReply(cn *pool.Conn) error { +func (cmd *SliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(sliceParser) + v, cmd.err = rd.ReadArrayReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadStringReply() +func (cmd *StatusCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadIntReply() +func (cmd *IntCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() return cmd.err } @@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(cn *pool.Conn) error { +func (cmd *DurationCmd) readReply(rd *proto.Reader) error { var n int64 - n, cmd.err = cn.Rd.ReadIntReply() + n, cmd.err = rd.ReadIntReply() if cmd.err != nil { return cmd.err } @@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(cn *pool.Conn) error { +func (cmd *TimeCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(timeParser) + v, cmd.err = rd.ReadArrayReply(timeParser) if cmd.err != nil { return cmd.err } @@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + //------------------------------------------------------------------------------ type BoolCmd struct { @@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string { return cmdString(cmd, cmd.val) } -var ok = []byte("OK") - -func (cmd *BoolCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadReply(nil) + v, cmd.err = rd.ReadReply(nil) // `SET key value NX` returns nil when key already exists. But // `SETNX key value` returns bool (0/1). So convert nil to bool. // TODO: is this okay? @@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { case int64: cmd.val = v == 1 return nil - case []byte: - cmd.val = bytes.Equal(v, ok) + case string: + cmd.val = v == "OK" return nil default: cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) @@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { type StringCmd struct { baseCmd - val []byte + val string } var _ Cmder = (*StringCmd)(nil) @@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd { } func (cmd *StringCmd) Val() string { - return util.BytesToString(cmd.val) + return cmd.val } func (cmd *StringCmd) Result() (string, error) { @@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) { } func (cmd *StringCmd) Bytes() ([]byte, error) { - return cmd.val, cmd.err + return []byte(cmd.val), cmd.err +} + +func (cmd *StringCmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.Atoi(cmd.Val()) } func (cmd *StringCmd) Int64() (int64, error) { @@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error { if cmd.err != nil { return cmd.err } - return proto.Scan(cmd.val, val) + return proto.Scan([]byte(cmd.val), val) } func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadBytesReply() +func (cmd *StringCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() return cmd.err } @@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser) + v, cmd.err = rd.ReadArrayReply(stringSliceParser) if cmd.err != nil { return cmd.err } @@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadString() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + //------------------------------------------------------------------------------ type BoolSliceCmd struct { @@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser) + v, cmd.err = rd.ReadArrayReply(boolSliceParser) if cmd.err != nil { return cmd.err } @@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + //------------------------------------------------------------------------------ type StringStringMapCmd struct { @@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser) + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) if cmd.err != nil { return cmd.err } @@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ type StringIntMapCmd struct { @@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser) + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) if cmd.err != nil { return cmd.err } @@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { @@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser) + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) if cmd.err != nil { return cmd.err } @@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { return nil } -//------------------------------------------------------------------------------ +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } -type XStream struct { - Stream string - Messages []*XMessage + m[key] = struct{}{} + } + return m, nil } +//------------------------------------------------------------------------------ + type XMessage struct { ID string Values map[string]interface{} } +type XMessageSliceCmd struct { + baseCmd + + val []XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]XMessage, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ +type XStream struct { + Stream string + Messages []XMessage +} + type XStreamSliceCmd struct { baseCmd - val []*XStream + val []XStream } var _ Cmder = (*XStreamSliceCmd)(nil) @@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { } } -func (cmd *XStreamSliceCmd) Val() []*XStream { +func (cmd *XStreamSliceCmd) Val() []XStream { return cmd.val } -func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) { +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { return cmd.val, cmd.err } @@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XStream) + cmd.val = v.([]XStream) return nil } // Implements proto.MultiBulkParse func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - xx := make([]*XStream, n) + ret := make([]XStream, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xStreamParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) if err != nil { return nil, err } - xx[i] = v.(*XStream) } - return xx, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} - stream, err := rd.ReadStringReply() +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() if err != nil { return nil, err } - v, err := rd.ReadArrayReply(xMessageSliceParser) - if err != nil { + lower, err := rd.ReadString() + if err != nil && err != Nil { return nil, err } - return &XStream{ - Stream: stream, - Messages: v.([]*XMessage), - }, nil + higher, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumerPending, err := rd.ReadInt() + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil } //------------------------------------------------------------------------------ -type XMessageSliceCmd struct { - baseCmd +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} - val []*XMessage +type XPendingExtCmd struct { + baseCmd + val []XPendingExt } -var _ Cmder = (*XMessageSliceCmd)(nil) +var _ Cmder = (*XPendingExtCmd)(nil) -func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { - return &XMessageSliceCmd{ +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *XMessageSliceCmd) Val() []*XMessage { +func (cmd *XPendingExtCmd) Val() []XPendingExt { return cmd.val } -func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) { +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { return cmd.val, cmd.err } -func (cmd *XMessageSliceCmd) String() string { +func (cmd *XPendingExtCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { - var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser) +func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XMessage) + cmd.val = info.([]XPendingExt) return nil } -// Implements proto.MultiBulkParse -func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - msgs := make([]*XMessage, n) +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xMessageParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) if err != nil { return nil, err } - msgs[i] = v.(*XMessage) } - return msgs, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } +//------------------------------------------------------------------------------ - v, err := rd.ReadArrayReply(xKeyValueParser) - if err != nil { - return nil, err +//------------------------------------------------------------------------------ + +type ZSliceCmd struct { + baseCmd + + val []Z +} + +var _ Cmder = (*ZSliceCmd)(nil) + +func NewZSliceCmd(args ...interface{}) *ZSliceCmd { + return &ZSliceCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *ZSliceCmd) Val() []Z { + return cmd.val +} - return &XMessage{ - ID: id, - Values: v.(map[string]interface{}), - }, nil +func (cmd *ZSliceCmd) Result() ([]Z, error) { + return cmd.val, cmd.err +} + +func (cmd *ZSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]Z) + return nil } // Implements proto.MultiBulkParse -func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { - values := make(map[string]interface{}, n) +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadString() if err != nil { return nil, err } - value, err := rd.ReadStringReply() + z.Score, err = rd.ReadFloatReply() if err != nil { return nil, err } - - values[key] = value } - return values, nil + return zz, nil } //------------------------------------------------------------------------------ -type ZSliceCmd struct { +type ZWithKeyCmd struct { baseCmd - val []Z + val ZWithKey } -var _ Cmder = (*ZSliceCmd)(nil) +var _ Cmder = (*ZWithKeyCmd)(nil) -func NewZSliceCmd(args ...interface{}) *ZSliceCmd { - return &ZSliceCmd{ +func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { + return &ZWithKeyCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *ZSliceCmd) Val() []Z { +func (cmd *ZWithKeyCmd) Val() ZWithKey { return cmd.val } -func (cmd *ZSliceCmd) Result() ([]Z, error) { - return cmd.val, cmd.err +func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { + return cmd.Val(), cmd.Err() } -func (cmd *ZSliceCmd) String() string { +func (cmd *ZWithKeyCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser) + v, cmd.err = rd.ReadArrayReply(zWithKeyParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]Z) + cmd.val = v.(ZWithKey) return nil } +// Implements proto.MultiBulkParse +func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 3 { + return nil, fmt.Errorf("got %d elements, expected 3", n) + } + + var z ZWithKey + var err error + + z.Key, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + return z, nil +} + //------------------------------------------------------------------------------ type ScanCmd struct { @@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(cn *pool.Conn) error { - cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply() +func (cmd *ScanCmd) readReply(rd *proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() return cmd.err } @@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { +func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser) + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) if cmd.err != nil { return cmd.err } @@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadString() + if err != nil { + return nil, err + } + + port, err := rd.ReadString() + if err != nil { + return nil, err + } + + nodes[j].Addr = net.JoinHostPort(ip, port) + + if n == 3 { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + //------------------------------------------------------------------------------ // GeoLocation is used with GeoAdd to add geospatial location. @@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) if cmd.err != nil { return cmd.err } @@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { return nil } +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case string: + locs = append(locs, GeoLocation{ + Name: vv, + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + //------------------------------------------------------------------------------ type GeoPos struct { @@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.positions) } -func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser) + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) if cmd.err != nil { return cmd.err } @@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + //------------------------------------------------------------------------------ type CommandInfo struct { @@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { +func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser) + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) if cmd.err != nil { return cmd.err } @@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + //------------------------------------------------------------------------------ type cmdsInfoCache struct { diff --git a/src/dma/vendor/github.com/go-redis/redis/commands.go b/src/dma/vendor/github.com/go-redis/redis/commands.go index dddf8acd..653e4abe 100644 --- a/src/dma/vendor/github.com/go-redis/redis/commands.go +++ b/src/dma/vendor/github.com/go-redis/redis/commands.go @@ -8,13 +8,6 @@ import ( "github.com/go-redis/redis/internal" ) -func readTimeout(timeout time.Duration) time.Duration { - if timeout == 0 { - return 0 - } - return timeout + 10*time.Second -} - func usePrecise(dur time.Duration) bool { return dur < time.Second || dur%time.Second != 0 } @@ -172,16 +165,30 @@ type Cmdable interface { SRem(key string, members ...interface{}) *IntCmd SUnion(keys ...string) *StringSliceCmd SUnionStore(destination string, keys ...string) *IntCmd - XAdd(stream, id string, els map[string]interface{}) *StringCmd - XAddExt(opt *XAddExt) *StringCmd - XLen(key string) *IntCmd + XAdd(a *XAddArgs) *StringCmd + XDel(stream string, ids ...string) *IntCmd + XLen(stream string) *IntCmd XRange(stream, start, stop string) *XMessageSliceCmd XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd XRevRange(stream string, start, stop string) *XMessageSliceCmd XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd - XRead(streams ...string) *XStreamSliceCmd - XReadN(count int64, streams ...string) *XStreamSliceCmd - XReadExt(opt *XReadExt) *XStreamSliceCmd + XRead(a *XReadArgs) *XStreamSliceCmd + XReadStreams(streams ...string) *XStreamSliceCmd + XGroupCreate(stream, group, start string) *StatusCmd + XGroupCreateMkStream(stream, group, start string) *StatusCmd + XGroupSetID(stream, group, start string) *StatusCmd + XGroupDestroy(stream, group string) *IntCmd + XGroupDelConsumer(stream, group, consumer string) *IntCmd + XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd + XAck(stream, group string, ids ...string) *IntCmd + XPending(stream, group string) *XPendingCmd + XPendingExt(a *XPendingExtArgs) *XPendingExtCmd + XClaim(a *XClaimArgs) *XMessageSliceCmd + XClaimJustID(a *XClaimArgs) *StringSliceCmd + XTrim(key string, maxLen int64) *IntCmd + XTrimApprox(key string, maxLen int64) *IntCmd + BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd + BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd ZAdd(key string, members ...Z) *IntCmd ZAddNX(key string, members ...Z) *IntCmd ZAddXX(key string, members ...Z) *IntCmd @@ -196,6 +203,8 @@ type Cmdable interface { ZLexCount(key, min, max string) *IntCmd ZIncrBy(key string, increment float64, member string) *FloatCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd + ZPopMax(key string, count ...int64) *ZSliceCmd + ZPopMin(key string, count ...int64) *ZSliceCmd ZRange(key string, start, stop int64) *StringSliceCmd ZRangeWithScores(key string, start, stop int64) *ZSliceCmd ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd @@ -223,6 +232,7 @@ type Cmdable interface { ClientKillByFilter(keys ...string) *IntCmd ClientList() *StringCmd ClientPause(dur time.Duration) *BoolCmd + ClientID() *IntCmd ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd @@ -260,6 +270,7 @@ type Cmdable interface { ClusterResetHard() *StatusCmd ClusterInfo() *StringCmd ClusterKeySlot(key string) *IntCmd + ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd ClusterCountFailureReports(nodeID string) *IntCmd ClusterCountKeysInSlot(slot int) *IntCmd ClusterDelSlots(slots ...int) *StatusCmd @@ -1300,7 +1311,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd { //------------------------------------------------------------------------------ -type XAddExt struct { +type XAddArgs struct { Stream string MaxLen int64 // MAXLEN N MaxLenApprox int64 // MAXLEN ~ N @@ -1308,40 +1319,42 @@ type XAddExt struct { Values map[string]interface{} } -func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd { - a := make([]interface{}, 0, 6+len(opt.Values)*2) - a = append(a, "xadd") - a = append(a, opt.Stream) - if opt.MaxLen > 0 { - a = append(a, "maxlen", opt.MaxLen) - } else if opt.MaxLenApprox > 0 { - a = append(a, "maxlen", "~", opt.MaxLenApprox) +func (c *cmdable) XAdd(a *XAddArgs) *StringCmd { + args := make([]interface{}, 0, 6+len(a.Values)*2) + args = append(args, "xadd") + args = append(args, a.Stream) + if a.MaxLen > 0 { + args = append(args, "maxlen", a.MaxLen) + } else if a.MaxLenApprox > 0 { + args = append(args, "maxlen", "~", a.MaxLenApprox) } - if opt.ID != "" { - a = append(a, opt.ID) + if a.ID != "" { + args = append(args, a.ID) } else { - a = append(a, "*") + args = append(args, "*") } - for k, v := range opt.Values { - a = append(a, k) - a = append(a, v) + for k, v := range a.Values { + args = append(args, k) + args = append(args, v) } - cmd := NewStringCmd(a...) + cmd := NewStringCmd(args...) c.process(cmd) return cmd } -func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd { - return c.XAddExt(&XAddExt{ - Stream: stream, - ID: id, - Values: values, - }) +func (c *cmdable) XDel(stream string, ids ...string) *IntCmd { + args := []interface{}{"xdel", stream} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd } -func (c *cmdable) XLen(key string) *IntCmd { - cmd := NewIntCmd("xlen", key) +func (c *cmdable) XLen(stream string) *IntCmd { + cmd := NewIntCmd("xlen", stream) c.process(cmd) return cmd } @@ -1370,55 +1383,190 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS return cmd } -type XReadExt struct { +type XReadArgs struct { Streams []string Count int64 Block time.Duration } -func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd { - a := make([]interface{}, 0, 5+len(opt.Streams)) - a = append(a, "xread") - if opt != nil { - if opt.Count > 0 { - a = append(a, "count") - a = append(a, opt.Count) - } - if opt.Block >= 0 { - a = append(a, "block") - a = append(a, int64(opt.Block/time.Millisecond)) - } +func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 5+len(a.Streams)) + args = append(args, "xread") + if a.Count > 0 { + args = append(args, "count") + args = append(args, a.Count) + } + if a.Block >= 0 { + args = append(args, "block") + args = append(args, int64(a.Block/time.Millisecond)) } - a = append(a, "streams") - for _, s := range opt.Streams { - a = append(a, s) + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) } - cmd := NewXStreamSliceCmd(a...) + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } c.process(cmd) return cmd } -func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ +func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd { + return c.XRead(&XReadArgs{ Streams: streams, Block: -1, }) } -func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Count: count, - Block: -1, - }) +func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start) + c.process(cmd) + return cmd } -func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd { - return c.XReadExt(&XReadExt{ - Streams: streams, - Block: block, - }) +func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream") + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd { + cmd := NewStatusCmd("xgroup", "setid", stream, group, start) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd { + cmd := NewIntCmd("xgroup", "destroy", stream, group) + c.process(cmd) + return cmd +} + +func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd { + cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer) + c.process(cmd) + return cmd +} + +type XReadGroupArgs struct { + Group string + Consumer string + // List of streams and ids. + Streams []string + Count int64 + Block time.Duration + NoAck bool +} + +func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd { + args := make([]interface{}, 0, 8+len(a.Streams)) + args = append(args, "xreadgroup", "group", a.Group, a.Consumer) + if a.Count > 0 { + args = append(args, "count", a.Count) + } + if a.Block >= 0 { + args = append(args, "block", int64(a.Block/time.Millisecond)) + } + if a.NoAck { + args = append(args, "noack") + } + args = append(args, "streams") + for _, s := range a.Streams { + args = append(args, s) + } + + cmd := NewXStreamSliceCmd(args...) + if a.Block >= 0 { + cmd.setReadTimeout(a.Block) + } + c.process(cmd) + return cmd +} + +func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd { + args := []interface{}{"xack", stream, group} + for _, id := range ids { + args = append(args, id) + } + cmd := NewIntCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XPending(stream, group string) *XPendingCmd { + cmd := NewXPendingCmd("xpending", stream, group) + c.process(cmd) + return cmd +} + +type XPendingExtArgs struct { + Stream string + Group string + Start string + End string + Count int64 + Consumer string +} + +func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count) + if a.Consumer != "" { + args = append(args, a.Consumer) + } + cmd := NewXPendingExtCmd(args...) + c.process(cmd) + return cmd +} + +type XClaimArgs struct { + Stream string + Group string + Consumer string + MinIdle time.Duration + Messages []string +} + +func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd { + args := xClaimArgs(a) + cmd := NewXMessageSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd { + args := xClaimArgs(a) + args = append(args, "justid") + cmd := NewStringSliceCmd(args...) + c.process(cmd) + return cmd +} + +func xClaimArgs(a *XClaimArgs) []interface{} { + args := make([]interface{}, 0, 4+len(a.Messages)) + args = append(args, + "xclaim", + a.Stream, + a.Group, a.Consumer, + int64(a.MinIdle/time.Millisecond)) + for _, id := range a.Messages { + args = append(args, id) + } + return args +} + +func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", maxLen) + c.process(cmd) + return cmd +} + +func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd { + cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen) + c.process(cmd) + return cmd } //------------------------------------------------------------------------------ @@ -1429,6 +1577,12 @@ type Z struct { Member interface{} } +// ZWithKey represents sorted set member including the name of the key where it was popped. +type ZWithKey struct { + Z + Key string +} + // ZStore is used as an arg to ZInterStore and ZUnionStore. type ZStore struct { Weights []float64 @@ -1436,6 +1590,34 @@ type ZStore struct { Aggregate string } +// Redis `BZPOPMAX key [key ...] timeout` command. +func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmax" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + +// Redis `BZPOPMIN key [key ...] timeout` command. +func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd { + args := make([]interface{}, 1+len(keys)+1) + args[0] = "bzpopmin" + for i, key := range keys { + args[1+i] = key + } + args[len(args)-1] = formatSec(timeout) + cmd := NewZWithKeyCmd(args...) + cmd.setReadTimeout(timeout) + c.process(cmd) + return cmd +} + func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd { for i, m := range members { a[n+2*i] = m.Score @@ -1574,6 +1756,46 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string) return cmd } +func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmax", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + +func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd { + args := []interface{}{ + "zpopmin", + key, + } + + switch len(count) { + case 0: + break + case 1: + args = append(args, count[0]) + default: + panic("too many arguments") + } + + cmd := NewZSliceCmd(args...) + c.process(cmd) + return cmd +} + func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd { args := []interface{}{ "zrange", @@ -1849,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd { return cmd } +func (c *cmdable) ClientID() *IntCmd { + cmd := NewIntCmd("client", "id") + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblock(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id) + c.process(cmd) + return cmd +} + +func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd { + cmd := NewIntCmd("client", "unblock", id, "error") + c.process(cmd) + return cmd +} + // ClientSetName assigns a name to the connection. func (c *statefulCmdable) ClientSetName(name string) *BoolCmd { cmd := NewBoolCmd("client", "setname", name) @@ -2164,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd { return cmd } +func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd { + cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count) + c.process(cmd) + return cmd +} + func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd { cmd := NewIntCmd("cluster", "count-failure-reports", nodeID) c.process(cmd) diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/error.go b/src/dma/vendor/github.com/go-redis/redis/internal/error.go index e0ff8632..34f6bd4d 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/error.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/error.go @@ -8,9 +8,18 @@ import ( "github.com/go-redis/redis/internal/proto" ) -func IsRetryableError(err error, retryNetError bool) bool { - if IsNetworkError(err) { - return retryNetError +func IsRetryableError(err error, retryTimeout bool) bool { + if err == nil { + return false + } + if err == io.EOF { + return true + } + if netErr, ok := err.(net.Error); ok { + if netErr.Timeout() { + return retryTimeout + } + return true } s := err.Error() if s == "ERR max number of clients reached" { @@ -33,20 +42,13 @@ func IsRedisError(err error) bool { return ok } -func IsNetworkError(err error) bool { - if err == io.EOF { - return true - } - _, ok := err.(net.Error) - return ok -} - func IsBadConn(err error, allowTimeout bool) bool { if err == nil { return false } if IsRedisError(err) { - return strings.HasPrefix(err.Error(), "READONLY ") + // #790 + return IsReadOnlyError(err) } if allowTimeout { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -81,3 +83,7 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { func IsLoadingError(err error) bool { return strings.HasPrefix(err.Error(), "LOADING ") } + +func IsReadOnlyError(err error) bool { + return strings.HasPrefix(err.Error(), "READONLY ") +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go index acaf3665..1095bfe5 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/conn.go @@ -13,19 +13,21 @@ var noDeadline = time.Time{} type Conn struct { netConn net.Conn - Rd *proto.Reader - Wb *proto.WriteBuffer + rd *proto.Reader + rdLocked bool + wr *proto.Writer - Inited bool - usedAt atomic.Value + InitedAt time.Time + pooled bool + usedAt atomic.Value } func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, - Wb: proto.NewWriteBuffer(), } - cn.Rd = proto.NewReader(cn.netConn) + cn.rd = proto.NewReader(netConn) + cn.wr = proto.NewWriter(netConn) cn.SetUsedAt(time.Now()) return cn } @@ -40,31 +42,26 @@ func (cn *Conn) SetUsedAt(tm time.Time) { func (cn *Conn) SetNetConn(netConn net.Conn) { cn.netConn = netConn - cn.Rd.Reset(netConn) + cn.rd.Reset(netConn) + cn.wr.Reset(netConn) } -func (cn *Conn) IsStale(timeout time.Duration) bool { - return timeout > 0 && time.Since(cn.UsedAt()) > timeout -} - -func (cn *Conn) SetReadTimeout(timeout time.Duration) { +func (cn *Conn) setReadTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetReadDeadline(now.Add(timeout)) - } else { - cn.netConn.SetReadDeadline(noDeadline) + return cn.netConn.SetReadDeadline(now.Add(timeout)) } + return cn.netConn.SetReadDeadline(noDeadline) } -func (cn *Conn) SetWriteTimeout(timeout time.Duration) { +func (cn *Conn) setWriteTimeout(timeout time.Duration) error { now := time.Now() cn.SetUsedAt(now) if timeout > 0 { - cn.netConn.SetWriteDeadline(now.Add(timeout)) - } else { - cn.netConn.SetWriteDeadline(noDeadline) + return cn.netConn.SetWriteDeadline(now.Add(timeout)) } + return cn.netConn.SetWriteDeadline(noDeadline) } func (cn *Conn) Write(b []byte) (int, error) { @@ -75,6 +72,22 @@ func (cn *Conn) RemoteAddr() net.Addr { return cn.netConn.RemoteAddr() } +func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { + _ = cn.setReadTimeout(timeout) + return fn(cn.rd) +} + +func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { + _ = cn.setWriteTimeout(timeout) + + firstErr := fn(cn.wr) + err := cn.wr.Flush() + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr +} + func (cn *Conn) Close() error { return cn.netConn.Close() } diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go index cab66904..9cecee8a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -28,7 +28,6 @@ type Stats struct { Timeouts uint32 // number of times a wait timeout occurred TotalConns uint32 // number of total connections in the pool - FreeConns uint32 // deprecated - use IdleConns IdleConns uint32 // number of idle connections in the pool StaleConns uint32 // number of stale connections removed from the pool } @@ -53,6 +52,8 @@ type Options struct { OnClose func(*Conn) error PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -63,16 +64,16 @@ type ConnPool struct { dialErrorsNum uint32 // atomic - lastDialError error lastDialErrorMu sync.RWMutex + lastDialError error queue chan struct{} - connsMu sync.Mutex - conns []*Conn - - idleConnsMu sync.RWMutex - idleConns []*Conn + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + poolSize int + idleConnsLen int stats Stats @@ -90,6 +91,10 @@ func NewConnPool(opt *Options) *ConnPool { idleConns: make([]*Conn, 0, opt.PoolSize), } + for i := 0; i < opt.MinIdleConns; i++ { + p.checkMinIdleConns() + } + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } @@ -97,19 +102,53 @@ func NewConnPool(opt *Options) *ConnPool { return p } +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go p.addIdleConn() + } +} + +func (p *ConnPool) addIdleConn() { + cn, err := p.newConn(true) + if err != nil { + return + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() +} + func (p *ConnPool) NewConn() (*Conn, error) { - cn, err := p.newConn() + return p._NewConn(false) +} + +func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) { + cn, err := p.newConn(pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) + if pooled { + if p.poolSize < p.opt.PoolSize { + p.poolSize++ + } else { + cn.pooled = false + } + } p.connsMu.Unlock() return cn, nil } -func (p *ConnPool) newConn() (*Conn, error) { +func (p *ConnPool) newConn(pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } @@ -127,7 +166,9 @@ func (p *ConnPool) newConn() (*Conn, error) { return nil, err } - return NewConn(netConn), nil + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil } func (p *ConnPool) tryDial() { @@ -174,16 +215,16 @@ func (p *ConnPool) Get() (*Conn, error) { } for { - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.popIdle() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn == nil { break } - if cn.IsStale(p.opt.IdleTimeout) { - p.CloseConn(cn) + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) continue } @@ -193,7 +234,7 @@ func (p *ConnPool) Get() (*Conn, error) { atomic.AddUint32(&p.stats.Misses, 1) - newcn, err := p.NewConn() + newcn, err := p._NewConn(true) if err != nil { p.freeTurn() return nil, err @@ -241,21 +282,21 @@ func (p *ConnPool) popIdle() *Conn { idx := len(p.idleConns) - 1 cn := p.idleConns[idx] p.idleConns = p.idleConns[:idx] - + p.idleConnsLen-- + p.checkMinIdleConns() return cn } func (p *ConnPool) Put(cn *Conn) { - buf := cn.Rd.PeekBuffered() - if buf != nil { - internal.Logf("connection has unread data: %.100q", buf) + if !cn.pooled { p.Remove(cn) return } - p.idleConnsMu.Lock() + p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) - p.idleConnsMu.Unlock() + p.idleConnsLen++ + p.connsMu.Unlock() p.freeTurn() } @@ -275,6 +316,10 @@ func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } break } } @@ -291,17 +336,17 @@ func (p *ConnPool) closeConn(cn *Conn) error { // Len returns total number of connections. func (p *ConnPool) Len() int { p.connsMu.Lock() - l := len(p.conns) + n := len(p.conns) p.connsMu.Unlock() - return l + return n } -// FreeLen returns number of idle connections. +// IdleLen returns number of idle connections. func (p *ConnPool) IdleLen() int { - p.idleConnsMu.RLock() - l := len(p.idleConns) - p.idleConnsMu.RUnlock() - return l + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n } func (p *ConnPool) Stats() *Stats { @@ -312,7 +357,6 @@ func (p *ConnPool) Stats() *Stats { Timeouts: atomic.LoadUint32(&p.stats.Timeouts), TotalConns: uint32(p.Len()), - FreeConns: uint32(idleLen), IdleConns: uint32(idleLen), StaleConns: atomic.LoadUint32(&p.stats.StaleConns), } @@ -349,11 +393,10 @@ func (p *ConnPool) Close() error { } } p.conns = nil - p.connsMu.Unlock() - - p.idleConnsMu.Lock() + p.poolSize = 0 p.idleConns = nil - p.idleConnsMu.Unlock() + p.idleConnsLen = 0 + p.connsMu.Unlock() return firstErr } @@ -364,11 +407,12 @@ func (p *ConnPool) reapStaleConn() *Conn { } cn := p.idleConns[0] - if !cn.IsStale(p.opt.IdleTimeout) { + if !p.isStaleConn(cn) { return nil } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- return cn } @@ -378,9 +422,9 @@ func (p *ConnPool) ReapStaleConns() (int, error) { for { p.getTurn() - p.idleConnsMu.Lock() + p.connsMu.Lock() cn := p.reapStaleConn() - p.idleConnsMu.Unlock() + p.connsMu.Unlock() if cn != nil { p.removeConn(cn) @@ -414,3 +458,19 @@ func (p *ConnPool) reaper(frequency time.Duration) { atomic.AddUint32(&p.stats.StaleConns, uint32(n)) } } + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.InitedAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go index 8c28c7b7..896b6f65 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/reader.go @@ -9,8 +9,6 @@ import ( "github.com/go-redis/redis/internal/util" ) -const bytesAllocLimit = 1024 * 1024 // 1mb - const ( ErrorReply = '-' StatusReply = '+' @@ -32,40 +30,23 @@ func (e RedisError) Error() string { return string(e) } type MultiBulkParse func(*Reader, int64) (interface{}, error) type Reader struct { - src *bufio.Reader - buf []byte + rd *bufio.Reader + _buf []byte } func NewReader(rd io.Reader) *Reader { return &Reader{ - src: bufio.NewReader(rd), - buf: make([]byte, 4096), + rd: bufio.NewReader(rd), + _buf: make([]byte, 64), } } func (r *Reader) Reset(rd io.Reader) { - r.src.Reset(rd) -} - -func (r *Reader) PeekBuffered() []byte { - if n := r.src.Buffered(); n != 0 { - b, _ := r.src.Peek(n) - return b - } - return nil -} - -func (r *Reader) ReadN(n int) ([]byte, error) { - b, err := readN(r.src, r.buf, n) - if err != nil { - return nil, err - } - r.buf = b - return b, nil + r.rd.Reset(rd) } func (r *Reader) ReadLine() ([]byte, error) { - line, isPrefix, err := r.src.ReadLine() + line, isPrefix, err := r.rd.ReadLine() if err != nil { return nil, err } @@ -91,11 +72,11 @@ func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) { case ErrorReply: return nil, ParseErrorReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil case IntReply: return util.ParseInt(line[1:], 10, 64) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case ArrayReply: n, err := parseArrayLen(line) if err != nil { @@ -121,47 +102,42 @@ func (r *Reader) ReadIntReply() (int64, error) { } } -func (r *Reader) ReadTmpBytesReply() ([]byte, error) { +func (r *Reader) ReadString() (string, error) { line, err := r.ReadLine() if err != nil { - return nil, err + return "", err } switch line[0] { case ErrorReply: - return nil, ParseErrorReply(line) + return "", ParseErrorReply(line) case StringReply: - return r.readTmpBytesReply(line) + return r.readStringReply(line) case StatusReply: - return parseTmpStatusReply(line), nil + return string(line[1:]), nil + case IntReply: + return string(line[1:]), nil default: - return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) + return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line) } } -func (r *Reader) ReadBytesReply() ([]byte, error) { - b, err := r.ReadTmpBytesReply() - if err != nil { - return nil, err +func (r *Reader) readStringReply(line []byte) (string, error) { + if isNilReply(line) { + return "", Nil } - cp := make([]byte, len(b)) - copy(cp, b) - return cp, nil -} -func (r *Reader) ReadStringReply() (string, error) { - b, err := r.ReadTmpBytesReply() + replyLen, err := strconv.Atoi(string(line[1:])) if err != nil { return "", err } - return string(b), nil -} -func (r *Reader) ReadFloatReply() (float64, error) { - b, err := r.ReadTmpBytesReply() + b := make([]byte, replyLen+2) + _, err = io.ReadFull(r.rd, b) if err != nil { - return 0, err + return "", err } - return util.ParseFloat(b, 64) + + return util.BytesToString(b[:replyLen]), nil } func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) { @@ -219,7 +195,7 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { keys := make([]string, n) for i := int64(0); i < n; i++ { - key, err := r.ReadStringReply() + key, err := r.ReadString() if err != nil { return nil, 0, err } @@ -229,69 +205,71 @@ func (r *Reader) ReadScanReply() ([]string, uint64, error) { return keys, cursor, err } -func (r *Reader) readTmpBytesReply(line []byte) ([]byte, error) { - if isNilReply(line) { - return nil, Nil - } - - replyLen, err := strconv.Atoi(string(line[1:])) +func (r *Reader) ReadInt() (int64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } + return util.ParseInt(b, 10, 64) +} - b, err := r.ReadN(replyLen + 2) +func (r *Reader) ReadUint() (uint64, error) { + b, err := r.readTmpBytesReply() if err != nil { - return nil, err + return 0, err } - return b[:replyLen], nil + return util.ParseUint(b, 10, 64) } -func (r *Reader) ReadInt() (int64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) ReadFloatReply() (float64, error) { + b, err := r.readTmpBytesReply() if err != nil { return 0, err } - return util.ParseInt(b, 10, 64) + return util.ParseFloat(b, 64) } -func (r *Reader) ReadUint() (uint64, error) { - b, err := r.ReadTmpBytesReply() +func (r *Reader) readTmpBytesReply() ([]byte, error) { + line, err := r.ReadLine() if err != nil { - return 0, err + return nil, err + } + switch line[0] { + case ErrorReply: + return nil, ParseErrorReply(line) + case StringReply: + return r._readTmpBytesReply(line) + case StatusReply: + return line[1:], nil + default: + return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line) } - return util.ParseUint(b, 10, 64) } -// -------------------------------------------------------------------- +func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) { + if isNilReply(line) { + return nil, Nil + } -func readN(r io.Reader, b []byte, n int) ([]byte, error) { - if n == 0 && b == nil { - return make([]byte, 0), nil + replyLen, err := strconv.Atoi(string(line[1:])) + if err != nil { + return nil, err } - if cap(b) >= n { - b = b[:n] - _, err := io.ReadFull(r, b) - return b, err + buf := r.buf(replyLen + 2) + _, err = io.ReadFull(r.rd, buf) + if err != nil { + return nil, err } - b = b[:cap(b)] - pos := 0 - for pos < n { - diff := n - len(b) - if diff > bytesAllocLimit { - diff = bytesAllocLimit - } - b = append(b, make([]byte, diff)...) + return buf[:replyLen], nil +} - nn, err := io.ReadFull(r, b[pos:]) - if err != nil { - return nil, err - } - pos += nn +func (r *Reader) buf(n int) []byte { + if d := n - cap(r._buf); d > 0 { + r._buf = append(r._buf, make([]byte, d)...) } - - return b, nil + return r._buf[:n] } func isNilReply(b []byte) bool { @@ -304,10 +282,6 @@ func ParseErrorReply(line []byte) error { return RedisError(string(line[1:])) } -func parseTmpStatusReply(line []byte) []byte { - return line[1:] -} - func parseArrayLen(line []byte) (int64, error) { if isNilReply(line) { return 0, Nil diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go deleted file mode 100644 index 664f4c33..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/proto/write_buffer.go +++ /dev/null @@ -1,113 +0,0 @@ -package proto - -import ( - "encoding" - "fmt" - "strconv" -) - -type WriteBuffer struct { - b []byte -} - -func NewWriteBuffer() *WriteBuffer { - return &WriteBuffer{ - b: make([]byte, 0, 4096), - } -} - -func (w *WriteBuffer) Len() int { return len(w.b) } -func (w *WriteBuffer) Bytes() []byte { return w.b } -func (w *WriteBuffer) Reset() { w.b = w.b[:0] } - -func (w *WriteBuffer) Append(args []interface{}) error { - w.b = append(w.b, ArrayReply) - w.b = strconv.AppendUint(w.b, uint64(len(args)), 10) - w.b = append(w.b, '\r', '\n') - - for _, arg := range args { - if err := w.append(arg); err != nil { - return err - } - } - return nil -} - -func (w *WriteBuffer) append(val interface{}) error { - switch v := val.(type) { - case nil: - w.AppendString("") - case string: - w.AppendString(v) - case []byte: - w.AppendBytes(v) - case int: - w.AppendString(formatInt(int64(v))) - case int8: - w.AppendString(formatInt(int64(v))) - case int16: - w.AppendString(formatInt(int64(v))) - case int32: - w.AppendString(formatInt(int64(v))) - case int64: - w.AppendString(formatInt(v)) - case uint: - w.AppendString(formatUint(uint64(v))) - case uint8: - w.AppendString(formatUint(uint64(v))) - case uint16: - w.AppendString(formatUint(uint64(v))) - case uint32: - w.AppendString(formatUint(uint64(v))) - case uint64: - w.AppendString(formatUint(v)) - case float32: - w.AppendString(formatFloat(float64(v))) - case float64: - w.AppendString(formatFloat(v)) - case bool: - if v { - w.AppendString("1") - } else { - w.AppendString("0") - } - case encoding.BinaryMarshaler: - b, err := v.MarshalBinary() - if err != nil { - return err - } - w.AppendBytes(b) - default: - return fmt.Errorf( - "redis: can't marshal %T (consider implementing encoding.BinaryMarshaler)", val) - } - return nil -} - -func (w *WriteBuffer) AppendString(s string) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(s)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, s...) - w.b = append(w.b, '\r', '\n') -} - -func (w *WriteBuffer) AppendBytes(p []byte) { - w.b = append(w.b, StringReply) - w.b = strconv.AppendUint(w.b, uint64(len(p)), 10) - w.b = append(w.b, '\r', '\n') - w.b = append(w.b, p...) - w.b = append(w.b, '\r', '\n') -} - -func formatInt(n int64) string { - return strconv.FormatInt(n, 10) -} - -func formatUint(u uint64) string { - return strconv.FormatUint(u, 10) -} - -func formatFloat(f float64) string { - return strconv.FormatFloat(f, 'f', -1, 64) -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go new file mode 100644 index 00000000..d106ce0e --- /dev/null +++ b/src/dma/vendor/github.com/go-redis/redis/internal/proto/writer.go @@ -0,0 +1,159 @@ +package proto + +import ( + "bufio" + "encoding" + "fmt" + "io" + "strconv" + + "github.com/go-redis/redis/internal/util" +) + +type Writer struct { + wr *bufio.Writer + + lenBuf []byte + numBuf []byte +} + +func NewWriter(wr io.Writer) *Writer { + return &Writer{ + wr: bufio.NewWriter(wr), + + lenBuf: make([]byte, 64), + numBuf: make([]byte, 64), + } +} + +func (w *Writer) WriteArgs(args []interface{}) error { + err := w.wr.WriteByte(ArrayReply) + if err != nil { + return err + } + + err = w.writeLen(len(args)) + if err != nil { + return err + } + + for _, arg := range args { + err := w.writeArg(arg) + if err != nil { + return err + } + } + + return nil +} + +func (w *Writer) writeLen(n int) error { + w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) + w.lenBuf = append(w.lenBuf, '\r', '\n') + _, err := w.wr.Write(w.lenBuf) + return err +} + +func (w *Writer) writeArg(v interface{}) error { + switch v := v.(type) { + case nil: + return w.string("") + case string: + return w.string(v) + case []byte: + return w.bytes(v) + case int: + return w.int(int64(v)) + case int8: + return w.int(int64(v)) + case int16: + return w.int(int64(v)) + case int32: + return w.int(int64(v)) + case int64: + return w.int(v) + case uint: + return w.uint(uint64(v)) + case uint8: + return w.uint(uint64(v)) + case uint16: + return w.uint(uint64(v)) + case uint32: + return w.uint(uint64(v)) + case uint64: + return w.uint(v) + case float32: + return w.float(float64(v)) + case float64: + return w.float(v) + case bool: + if v { + return w.int(1) + } else { + return w.int(0) + } + case encoding.BinaryMarshaler: + b, err := v.MarshalBinary() + if err != nil { + return err + } + return w.bytes(b) + default: + return fmt.Errorf( + "redis: can't marshal %T (implement encoding.BinaryMarshaler)", v) + } +} + +func (w *Writer) bytes(b []byte) error { + err := w.wr.WriteByte(StringReply) + if err != nil { + return err + } + + err = w.writeLen(len(b)) + if err != nil { + return err + } + + _, err = w.wr.Write(b) + if err != nil { + return err + } + + return w.crlf() +} + +func (w *Writer) string(s string) error { + return w.bytes(util.StringToBytes(s)) +} + +func (w *Writer) uint(n uint64) error { + w.numBuf = strconv.AppendUint(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) int(n int64) error { + w.numBuf = strconv.AppendInt(w.numBuf[:0], n, 10) + return w.bytes(w.numBuf) +} + +func (w *Writer) float(f float64) error { + w.numBuf = strconv.AppendFloat(w.numBuf[:0], f, 'f', -1, 64) + return w.bytes(w.numBuf) +} + +func (w *Writer) crlf() error { + err := w.wr.WriteByte('\r') + if err != nil { + return err + } + return w.wr.WriteByte('\n') +} + +func (w *Writer) Reset(wr io.Writer) { + w.wr.Reset(wr) +} + +func (w *Writer) Flush() error { + return w.wr.Flush() +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go b/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go deleted file mode 100644 index 3b174172..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/internal/singleflight/singleflight.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight - -import "sync" - -// call is an in-flight or completed Do call -type call struct { - wg sync.WaitGroup - val interface{} - err error -} - -// Group represents a class of work and forms a namespace in which -// units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - g.mu.Unlock() - c.wg.Wait() - return c.val, c.err - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - delete(g.m, key) - g.mu.Unlock() - - return c.val, c.err -} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go index cd891833..1b3060eb 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/safe.go @@ -5,3 +5,7 @@ package util func BytesToString(b []byte) string { return string(b) } + +func StringToBytes(s string) []byte { + return []byte(s) +} diff --git a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go index 93a89c55..c9868aac 100644 --- a/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go +++ b/src/dma/vendor/github.com/go-redis/redis/internal/util/unsafe.go @@ -10,3 +10,13 @@ import ( func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b)) } + +// StringToBytes converts string to byte slice. +func StringToBytes(s string) []byte { + return *(*[]byte)(unsafe.Pointer( + &struct { + string + Cap int + }{s, len(s)}, + )) +} diff --git a/src/dma/vendor/github.com/go-redis/redis/options.go b/src/dma/vendor/github.com/go-redis/redis/options.go index 8a82d590..b6fabf3f 100644 --- a/src/dma/vendor/github.com/go-redis/redis/options.go +++ b/src/dma/vendor/github.com/go-redis/redis/options.go @@ -14,6 +14,17 @@ import ( "github.com/go-redis/redis/internal/pool" ) +// Limiter is the interface of a rate limiter or a circuit breaker. +type Limiter interface { + // Allow returns a nil if operation is allowed or an error otherwise. + // If operation is allowed client must report the result of operation + // whether is a success or a failure. + Allow() error + // ReportResult reports the result of previously allowed operation. + // nil indicates a success, non-nil error indicates a failure. + ReportResult(result error) +} + type Options struct { // The network type, either tcp or unix. // Default is tcp. @@ -48,7 +59,7 @@ type Options struct { // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. If reached, commands will fail - // with a timeout instead of blocking. + // with a timeout instead of blocking. Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. If reached, commands will fail @@ -59,6 +70,12 @@ type Options struct { // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int + // Minimum number of idle connections which is useful when establishing + // new connection is slow. + MinIdleConns int + // Connection age at which client retires (closes) the connection. + // Default is to not close aged connections. + MaxConnAge time.Duration // Amount of time client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. @@ -69,7 +86,8 @@ type Options struct { IdleTimeout time.Duration // Frequency of idle checks made by idle connections reaper. // Default is 1 minute. -1 disables idle connections reaper, - // but idle connections are still discarded by the client. + // but idle connections are still discarded by the client + // if IdleTimeout is set. IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. @@ -83,6 +101,9 @@ func (opt *Options) init() { if opt.Network == "" { opt.Network = "tcp" } + if opt.Addr == "" { + opt.Addr = "localhost:6379" + } if opt.Dialer == nil { opt.Dialer = func() (net.Conn, error) { netDialer := &net.Dialer{ @@ -196,6 +217,8 @@ func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{ Dialer: opt.Dialer, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, diff --git a/src/dma/vendor/github.com/go-redis/redis/parser.go b/src/dma/vendor/github.com/go-redis/redis/parser.go deleted file mode 100644 index f0dc67f0..00000000 --- a/src/dma/vendor/github.com/go-redis/redis/parser.go +++ /dev/null @@ -1,394 +0,0 @@ -package redis - -import ( - "fmt" - "net" - "strconv" - "time" - - "github.com/go-redis/redis/internal/proto" -) - -// Implements proto.MultiBulkParse -func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { - vals := make([]interface{}, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(sliceParser) - if err != nil { - if err == Nil { - vals = append(vals, nil) - continue - } - if err, ok := err.(proto.RedisError); ok { - vals = append(vals, err) - continue - } - return nil, err - } - - switch v := v.(type) { - case []byte: - vals = append(vals, string(v)) - default: - vals = append(vals, v) - } - } - return vals, nil -} - -// Implements proto.MultiBulkParse -func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - bools := make([]bool, 0, n) - for i := int64(0); i < n; i++ { - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - bools = append(bools, n == 1) - } - return bools, nil -} - -// Implements proto.MultiBulkParse -func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - ss := make([]string, 0, n) - for i := int64(0); i < n; i++ { - s, err := rd.ReadStringReply() - if err == Nil { - ss = append(ss, "") - } else if err != nil { - return nil, err - } else { - ss = append(ss, s) - } - } - return ss, nil -} - -// Implements proto.MultiBulkParse -func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]string, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - value, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = value - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]int64, n/2) - for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - n, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - m[key] = n - } - return m, nil -} - -// Implements proto.MultiBulkParse -func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]struct{}, n) - for i := int64(0); i < n; i++ { - key, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - m[key] = struct{}{} - } - return m, nil -} - -// Implements proto.MultiBulkParse -func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - zz := make([]Z, n/2) - for i := int64(0); i < n; i += 2 { - var err error - - z := &zz[i/2] - - z.Member, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - z.Score, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - return zz, nil -} - -// Implements proto.MultiBulkParse -func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { - slots := make([]ClusterSlot, n) - for i := 0; i < len(slots); i++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n < 2 { - err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) - return nil, err - } - - start, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - end, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - - nodes := make([]ClusterNode, n-2) - for j := 0; j < len(nodes); j++ { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 && n != 3 { - err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) - return nil, err - } - - ip, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - - port, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - nodes[j].Addr = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) - - if n == 3 { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } - nodes[j].Id = id - } - } - - slots[i] = ClusterSlot{ - Start: int(start), - End: int(end), - Nodes: nodes, - } - } - return slots, nil -} - -func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - var loc GeoLocation - var err error - - loc.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - if q.WithDist { - loc.Dist, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - if q.WithGeoHash { - loc.GeoHash, err = rd.ReadIntReply() - if err != nil { - return nil, err - } - } - if q.WithCoord { - n, err := rd.ReadArrayLen() - if err != nil { - return nil, err - } - if n != 2 { - return nil, fmt.Errorf("got %d coordinates, expected 2", n) - } - - loc.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - loc.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - } - - return &loc, nil - } -} - -func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { - return func(rd *proto.Reader, n int64) (interface{}, error) { - locs := make([]GeoLocation, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(newGeoLocationParser(q)) - if err != nil { - return nil, err - } - switch vv := v.(type) { - case []byte: - locs = append(locs, GeoLocation{ - Name: string(vv), - }) - case *GeoLocation: - locs = append(locs, *vv) - default: - return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) - } - } - return locs, nil - } -} - -func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { - var pos GeoPos - var err error - - pos.Longitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - pos.Latitude, err = rd.ReadFloatReply() - if err != nil { - return nil, err - } - - return &pos, nil -} - -func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - positions := make([]*GeoPos, 0, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(geoPosParser) - if err != nil { - if err == Nil { - positions = append(positions, nil) - continue - } - return nil, err - } - switch v := v.(type) { - case *GeoPos: - positions = append(positions, v) - default: - return nil, fmt.Errorf("got %T, expected *GeoPos", v) - } - } - return positions, nil -} - -func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { - var cmd CommandInfo - var err error - - if n != 6 { - return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) - } - - cmd.Name, err = rd.ReadStringReply() - if err != nil { - return nil, err - } - - arity, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.Arity = int8(arity) - - flags, err := rd.ReadReply(stringSliceParser) - if err != nil { - return nil, err - } - cmd.Flags = flags.([]string) - - firstKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.FirstKeyPos = int8(firstKeyPos) - - lastKeyPos, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.LastKeyPos = int8(lastKeyPos) - - stepCount, err := rd.ReadIntReply() - if err != nil { - return nil, err - } - cmd.StepCount = int8(stepCount) - - for _, flag := range cmd.Flags { - if flag == "readonly" { - cmd.ReadOnly = true - break - } - } - - return &cmd, nil -} - -// Implements proto.MultiBulkParse -func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - m := make(map[string]*CommandInfo, n) - for i := int64(0); i < n; i++ { - v, err := rd.ReadReply(commandInfoParser) - if err != nil { - return nil, err - } - vv := v.(*CommandInfo) - m[vv.Name] = vv - - } - return m, nil -} - -// Implements proto.MultiBulkParse -func timeParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d elements, expected 2", n) - } - - sec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - microsec, err := rd.ReadInt() - if err != nil { - return nil, err - } - - return time.Unix(sec, microsec*1000), nil -} diff --git a/src/dma/vendor/github.com/go-redis/redis/pipeline.go b/src/dma/vendor/github.com/go-redis/redis/pipeline.go index ba852283..b3a8844a 100644 --- a/src/dma/vendor/github.com/go-redis/redis/pipeline.go +++ b/src/dma/vendor/github.com/go-redis/redis/pipeline.go @@ -10,6 +10,7 @@ type pipelineExecer func([]Cmder) error type Pipeliner interface { StatefulCmdable + Do(args ...interface{}) *Cmd Process(cmd Cmder) error Close() error Discard() error @@ -31,6 +32,12 @@ type Pipeline struct { closed bool } +func (c *Pipeline) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + // Process queues the cmd for later execution. func (c *Pipeline) Process(cmd Cmder) error { c.mu.Lock() diff --git a/src/dma/vendor/github.com/go-redis/redis/pubsub.go b/src/dma/vendor/github.com/go-redis/redis/pubsub.go index 2cfcd150..0afb47cd 100644 --- a/src/dma/vendor/github.com/go-redis/redis/pubsub.go +++ b/src/dma/vendor/github.com/go-redis/redis/pubsub.go @@ -1,15 +1,19 @@ package redis import ( + "errors" "fmt" "sync" "time" "github.com/go-redis/redis/internal" "github.com/go-redis/redis/internal/pool" + "github.com/go-redis/redis/internal/proto" ) -// PubSub implements Pub/Sub commands as described in +var errPingTimeout = errors.New("redis: ping timeout") + +// PubSub implements Pub/Sub commands bas described in // http://redis.io/topics/pubsub. Message receiving is NOT safe // for concurrent use by multiple goroutines. // @@ -46,15 +50,17 @@ func (c *PubSub) conn() (*pool.Conn, error) { return cn, err } -func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { +func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } - if c.cn != nil { return c.cn, nil } + channels := mapKeys(c.channels) + channels = append(channels, newChannels...) + cn, err := c.newConn(channels) if err != nil { return nil, err @@ -69,20 +75,24 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { return cn, nil } +func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error { + return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) +} + func (c *PubSub) resubscribe(cn *pool.Conn) error { var firstErr error if len(c.channels) > 0 { - channels := mapKeys(c.channels) - err := c._subscribe(cn, "subscribe", channels...) + err := c._subscribe(cn, "subscribe", mapKeys(c.channels)) if err != nil && firstErr == nil { firstErr = err } } if len(c.patterns) > 0 { - patterns := mapKeys(c.patterns) - err := c._subscribe(cn, "psubscribe", patterns...) + err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns)) if err != nil && firstErr == nil { firstErr = err } @@ -101,51 +111,48 @@ func mapKeys(m map[string]struct{}) []string { return s } -func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error { - args := make([]interface{}, 1+len(channels)) - args[0] = redisCmd - for i, channel := range channels { - args[1+i] = channel +func (c *PubSub) _subscribe( + cn *pool.Conn, redisCmd string, channels []string, +) error { + args := make([]interface{}, 0, 1+len(channels)) + args = append(args, redisCmd) + for _, channel := range channels { + args = append(args, channel) } cmd := NewSliceCmd(args...) - - cn.SetWriteTimeout(c.opt.WriteTimeout) - return writeCmd(cn, cmd) + return c.writeCmd(cn, cmd) } -func (c *PubSub) releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) { c.mu.Lock() - c._releaseConn(cn, err) + c._releaseConn(cn, err, allowTimeout) c.mu.Unlock() } -func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { +func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) { if c.cn != cn { return } - if internal.IsBadConn(err, true) { - c._reconnect() + if internal.IsBadConn(err, allowTimeout) { + c._reconnect(err) } } -func (c *PubSub) _closeTheCn() error { - var err error - if c.cn != nil { - err = c.closeConn(c.cn) - c.cn = nil - } - return err -} - -func (c *PubSub) reconnect() { - c.mu.Lock() - c._reconnect() - c.mu.Unlock() +func (c *PubSub) _reconnect(reason error) { + _ = c._closeTheCn(reason) + _, _ = c._conn(nil) } -func (c *PubSub) _reconnect() { - _ = c._closeTheCn() - _, _ = c._conn(nil) +func (c *PubSub) _closeTheCn(reason error) error { + if c.cn == nil { + return nil + } + if !c.closed { + internal.Logf("redis: discarding bad PubSub connection: %s", reason) + } + err := c.closeConn(c.cn) + c.cn = nil + return err } func (c *PubSub) Close() error { @@ -158,7 +165,7 @@ func (c *PubSub) Close() error { c.closed = true close(c.exit) - err := c._closeTheCn() + err := c._closeTheCn(pool.ErrClosed) return err } @@ -172,8 +179,8 @@ func (c *PubSub) Subscribe(channels ...string) error { if c.channels == nil { c.channels = make(map[string]struct{}) } - for _, channel := range channels { - c.channels[channel] = struct{}{} + for _, s := range channels { + c.channels[s] = struct{}{} } return err } @@ -188,8 +195,8 @@ func (c *PubSub) PSubscribe(patterns ...string) error { if c.patterns == nil { c.patterns = make(map[string]struct{}) } - for _, pattern := range patterns { - c.patterns[pattern] = struct{}{} + for _, s := range patterns { + c.patterns[s] = struct{}{} } return err } @@ -200,10 +207,10 @@ func (c *PubSub) Unsubscribe(channels ...string) error { c.mu.Lock() defer c.mu.Unlock() - err := c.subscribe("unsubscribe", channels...) for _, channel := range channels { delete(c.channels, channel) } + err := c.subscribe("unsubscribe", channels...) return err } @@ -213,10 +220,10 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error { c.mu.Lock() defer c.mu.Unlock() - err := c.subscribe("punsubscribe", patterns...) for _, pattern := range patterns { delete(c.patterns, pattern) } + err := c.subscribe("punsubscribe", patterns...) return err } @@ -226,8 +233,8 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error { return err } - err = c._subscribe(cn, redisCmd, channels...) - c._releaseConn(cn, err) + err = c._subscribe(cn, redisCmd, channels) + c._releaseConn(cn, err, false) return err } @@ -243,9 +250,8 @@ func (c *PubSub) Ping(payload ...string) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - err = writeCmd(cn, cmd) - c.releaseConn(cn, err) + err = c.writeCmd(cn, cmd) + c.releaseConn(cn, err, false) return err } @@ -336,9 +342,11 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { return nil, err } - cn.SetReadTimeout(timeout) - err = c.cmd.readReply(cn) - c.releaseConn(cn, err) + err = cn.WithReader(timeout, func(rd *proto.Reader) error { + return c.cmd.readReply(rd) + }) + + c.releaseConn(cn, err, timeout > 0) if err != nil { return nil, err } @@ -432,21 +440,26 @@ func (c *PubSub) initChannel() { timer := time.NewTimer(timeout) timer.Stop() - var hasPing bool + healthy := true for { timer.Reset(timeout) select { case <-c.ping: - hasPing = true + healthy = true if !timer.Stop() { <-timer.C } case <-timer.C: - if hasPing { - hasPing = false - _ = c.Ping() + pingErr := c.Ping() + if healthy { + healthy = false } else { - c.reconnect() + if pingErr == nil { + pingErr = errPingTimeout + } + c.mu.Lock() + c._reconnect(pingErr) + c.mu.Unlock() } case <-c.exit: return diff --git a/src/dma/vendor/github.com/go-redis/redis/redis.go b/src/dma/vendor/github.com/go-redis/redis/redis.go index c0f142cc..aca30648 100644 --- a/src/dma/vendor/github.com/go-redis/redis/redis.go +++ b/src/dma/vendor/github.com/go-redis/redis/redis.go @@ -26,6 +26,7 @@ func SetLogger(logger *log.Logger) { type baseClient struct { opt *Options connPool pool.Pooler + limiter Limiter process func(Cmder) error processPipeline func([]Cmder) error @@ -50,7 +51,7 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { if err := c.initConn(cn); err != nil { _ = c.connPool.CloseConn(cn) return nil, err @@ -61,12 +62,30 @@ func (c *baseClient) newConn() (*pool.Conn, error) { } func (c *baseClient) getConn() (*pool.Conn, error) { + if c.limiter != nil { + err := c.limiter.Allow() + if err != nil { + return nil, err + } + } + + cn, err := c._getConn() + if err != nil { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + return nil, err + } + return cn, nil +} + +func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err } - if !cn.Inited { + if cn.InitedAt.IsZero() { err := c.initConn(cn) if err != nil { c.connPool.Remove(cn) @@ -77,18 +96,32 @@ func (c *baseClient) getConn() (*pool.Conn, error) { return cn, nil } -func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { +func (c *baseClient) releaseConn(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) + } + if internal.IsBadConn(err, false) { c.connPool.Remove(cn) - return false + } else { + c.connPool.Put(cn) + } +} + +func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { + if c.limiter != nil { + c.limiter.ReportResult(err) } - c.connPool.Put(cn) - return true + if err == nil || internal.IsRedisError(err) { + c.connPool.Put(cn) + } else { + c.connPool.Remove(cn) + } } func (c *baseClient) initConn(cn *pool.Conn) error { - cn.Inited = true + cn.InitedAt = time.Now() if c.opt.Password == "" && c.opt.DB == 0 && @@ -123,8 +156,17 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } +// Do creates a Cmd from the args and processes the cmd. +func (c *baseClient) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + _ = c.Process(cmd) + return cmd +} + // WrapProcess wraps function that processes Redis commands. -func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { +func (c *baseClient) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { c.process = fn(c.process) } @@ -147,8 +189,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmd); err != nil { + err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmd) + }) + if err != nil { c.releaseConn(cn, err) cmd.setErr(err) if internal.IsRetryableError(err, true) { @@ -157,8 +201,9 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return err } - cn.SetReadTimeout(c.cmdTimeout(cmd)) - err = cmd.readReply(cn) + err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { + return cmd.readReply(rd) + }) c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { continue @@ -176,7 +221,11 @@ func (c *baseClient) retryBackoff(attempt int) time.Duration { func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { - return readTimeout(*timeout) + t := *timeout + if t == 0 { + return 0 + } + return t + 10*time.Second } return c.opt.ReadTimeout } @@ -232,35 +281,33 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e } canRetry, err := p(cn, cmds) - - if err == nil || internal.IsRedisError(err) { - c.connPool.Put(cn) - break - } - c.connPool.Remove(cn) + c.releaseConnStrict(cn, err) if !canRetry || !internal.IsRetryableError(err, true) { break } } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmds...); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return writeCmd(wr, cmds...) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - return true, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + return pipelineReadCmds(rd, cmds) + }) + return true, err } -func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { +func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { for _, cmd := range cmds { - err := cmd.readReply(cn) + err := cmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } @@ -269,47 +316,50 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { } func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { - cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { + return txPipelineWriteMulti(wr, cmds) + }) + if err != nil { setCmdsErr(cmds, err) return true, err } - // Set read timeout for all commands. - cn.SetReadTimeout(c.opt.ReadTimeout) - - if err := c.txPipelineReadQueued(cn, cmds); err != nil { - setCmdsErr(cmds, err) - return false, err - } - - return false, pipelineReadCmds(cn, cmds) + err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { + err := txPipelineReadQueued(rd, cmds) + if err != nil { + setCmdsErr(cmds, err) + return err + } + return pipelineReadCmds(rd, cmds) + }) + return false, err } -func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { +func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error { multiExec := make([]Cmder, 0, len(cmds)+2) multiExec = append(multiExec, NewStatusCmd("MULTI")) multiExec = append(multiExec, cmds...) multiExec = append(multiExec, NewSliceCmd("EXEC")) - return writeCmd(cn, multiExec...) + return writeCmd(wr, multiExec...) } -func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { +func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { // Parse queued replies. var statusCmd StatusCmd - if err := statusCmd.readReply(cn); err != nil { + err := statusCmd.readReply(rd) + if err != nil { return err } - for _ = range cmds { - err := statusCmd.readReply(cn) + for range cmds { + err = statusCmd.readReply(rd) if err != nil && !internal.IsRedisError(err) { return err } } // Parse number of replies. - line, err := cn.Rd.ReadLine() + line, err := rd.ReadLine() if err != nil { if err == Nil { err = TxFailedErr @@ -373,12 +423,12 @@ func (c *Client) WithContext(ctx context.Context) *Client { if ctx == nil { panic("nil context") } - c2 := c.copy() + c2 := c.clone() c2.ctx = ctx return c2 } -func (c *Client) copy() *Client { +func (c *Client) clone() *Client { cp := *c cp.init() return &cp @@ -389,6 +439,11 @@ func (c *Client) Options() *Options { return c.opt } +func (c *Client) SetLimiter(l Limiter) *Client { + c.limiter = l + return c +} + type PoolStats pool.Stats // PoolStats returns connection pool stats. @@ -437,6 +492,30 @@ func (c *Client) pubSub() *PubSub { // Subscribe subscribes the client to the specified channels. // Channels can be omitted to create empty subscription. +// Note that this method does not wait on a response from Redis, so the +// subscription may not be active immediately. To force the connection to wait, +// you may call the Receive() method on the returned *PubSub like so: +// +// sub := client.Subscribe(queryResp) +// iface, err := sub.Receive() +// if err != nil { +// // handle error +// } +// +// // Should be *Subscription, but others are possible if other actions have been +// // taken on sub since it was created. +// switch iface.(type) { +// case *Subscription: +// // subscribe succeeded +// case *Message: +// // received first message +// case *Pong: +// // pong received +// default: +// // handle error +// } +// +// ch := sub.Channel() func (c *Client) Subscribe(channels ...string) *PubSub { pubsub := c.pubSub() if len(channels) > 0 { diff --git a/src/dma/vendor/github.com/go-redis/redis/result.go b/src/dma/vendor/github.com/go-redis/redis/result.go index e086e8e3..e438f260 100644 --- a/src/dma/vendor/github.com/go-redis/redis/result.go +++ b/src/dma/vendor/github.com/go-redis/redis/result.go @@ -53,7 +53,7 @@ func NewBoolResult(val bool, err error) *BoolCmd { // NewStringResult returns a StringCmd initialised with val and err for testing func NewStringResult(val string, err error) *StringCmd { var cmd StringCmd - cmd.val = []byte(val) + cmd.val = val cmd.setErr(err) return &cmd } diff --git a/src/dma/vendor/github.com/go-redis/redis/ring.go b/src/dma/vendor/github.com/go-redis/redis/ring.go index ef855115..250e5f64 100644 --- a/src/dma/vendor/github.com/go-redis/redis/ring.go +++ b/src/dma/vendor/github.com/go-redis/redis/ring.go @@ -68,6 +68,8 @@ type RingOptions struct { WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options { WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, + MinIdleConns: opt.MinIdleConns, + MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, @@ -315,12 +319,12 @@ func (c *ringShards) Close() error { //------------------------------------------------------------------------------ -// Ring is a Redis client that uses constistent hashing to distribute +// Ring is a Redis client that uses consistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from -// the ring. When shard comes online it is added back to the ring. This +// the ring. When a shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any @@ -338,6 +342,7 @@ type Ring struct { shards *ringShards cmdsInfoCache *cmdsInfoCache + process func(Cmder) error processPipeline func([]Cmder) error } @@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring { } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) + ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) @@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats { acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns - acc.FreeConns += s.FreeConns + acc.IdleConns += s.IdleConns } return &acc } @@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) { return c.shards.GetByKey(firstKey) } -func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) { - c.ForEachShard(func(c *Client) error { - c.WrapProcess(fn) - return nil - }) +// Do creates a Cmd from the args and processes the cmd. +func (c *Ring) Do(args ...interface{}) *Cmd { + cmd := NewCmd(args...) + c.Process(cmd) + return cmd +} + +func (c *Ring) WrapProcess( + fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, +) { + c.process = fn(c.process) } func (c *Ring) Process(cmd Cmder) error { - shard, err := c.cmdShard(cmd) - if err != nil { - cmd.setErr(err) - return err + return c.process(cmd) +} + +func (c *Ring) defaultProcess(cmd Cmder) error { + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) + } + + shard, err := c.cmdShard(cmd) + if err != nil { + cmd.setErr(err) + return err + } + + err = shard.Client.Process(cmd) + if err == nil { + return nil + } + if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { + return err + } } - return shard.Client.Process(cmd) + return cmd.Err() } func (c *Ring) Pipeline() Pipeliner { @@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { time.Sleep(c.retryBackoff(attempt)) } + var mu sync.Mutex var failedCmdsMap map[string][]Cmder + var wg sync.WaitGroup for hash, cmds := range cmdsMap { - shard, err := c.shards.GetByHash(hash) - if err != nil { - setCmdsErr(cmds, err) - continue - } + wg.Add(1) + go func(hash string, cmds []Cmder) { + defer wg.Done() + + shard, err := c.shards.GetByHash(hash) + if err != nil { + setCmdsErr(cmds, err) + return + } - cn, err := shard.Client.getConn() - if err != nil { - setCmdsErr(cmds, err) - continue - } + cn, err := shard.Client.getConn() + if err != nil { + setCmdsErr(cmds, err) + return + } - canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - if err == nil || internal.IsRedisError(err) { - shard.Client.connPool.Put(cn) - continue - } - shard.Client.connPool.Remove(cn) + canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) + shard.Client.releaseConnStrict(cn, err) - if canRetry && internal.IsRetryableError(err, true) { - if failedCmdsMap == nil { - failedCmdsMap = make(map[string][]Cmder) + if canRetry && internal.IsRetryableError(err, true) { + mu.Lock() + if failedCmdsMap == nil { + failedCmdsMap = make(map[string][]Cmder) + } + failedCmdsMap[hash] = cmds + mu.Unlock() } - failedCmdsMap[hash] = cmds - } + }(hash, cmds) } + wg.Wait() if len(failedCmdsMap) == 0 { break } cmdsMap = failedCmdsMap } - return firstCmdsErr(cmds) + return cmdsFirstErr(cmds) } func (c *Ring) TxPipeline() Pipeliner { diff --git a/src/dma/vendor/github.com/go-redis/redis/sentinel.go b/src/dma/vendor/github.com/go-redis/redis/sentinel.go index 12c29a71..7cbb90bd 100644 --- a/src/dma/vendor/github.com/go-redis/redis/sentinel.go +++ b/src/dma/vendor/github.com/go-redis/redis/sentinel.go @@ -29,13 +29,17 @@ type FailoverOptions struct { Password string DB int - MaxRetries int + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration @@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { }, } c.baseClient.init() - c.setProcessor(c.Process) + c.cmdable.setProcessor(c.Process) return &c } @@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { return c } -func (c *SentinelClient) PubSub() *PubSub { +func (c *SentinelClient) pubSub() *PubSub { pubsub := &PubSub{ opt: c.opt, @@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub { return pubsub } +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *SentinelClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *SentinelClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub() + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd { - cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name) + cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name) c.Process(cmd) return cmd } func (c *SentinelClient) Sentinels(name string) *SliceCmd { - cmd := NewSliceCmd("SENTINEL", "sentinels", name) + cmd := NewSliceCmd("sentinel", "sentinels", name) + c.Process(cmd) + return cmd +} + +// Failover forces a failover as if the master was not reachable, and without +// asking for agreement to other Sentinels. +func (c *SentinelClient) Failover(name string) *StatusCmd { + cmd := NewStatusCmd("sentinel", "failover", name) + c.Process(cmd) + return cmd +} + +// Reset resets all the masters with matching name. The pattern argument is a +// glob-style pattern. The reset process clears any previous state in a master +// (including a failover in progress), and removes every slave and sentinel +// already discovered and associated with the master. +func (c *SentinelClient) Reset(pattern string) *IntCmd { + cmd := NewIntCmd("sentinel", "reset", pattern) c.Process(cmd) return cmd } @@ -152,79 +194,81 @@ type sentinelFailover struct { masterName string _masterAddr string sentinel *SentinelClient + pubsub *PubSub } -func (d *sentinelFailover) Close() error { - return d.resetSentinel() +func (c *sentinelFailover) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.sentinel != nil { + return c.closeSentinel() + } + return nil } -func (d *sentinelFailover) Pool() *pool.ConnPool { - d.poolOnce.Do(func() { - d.opt.Dialer = d.dial - d.pool = newConnPool(d.opt) +func (c *sentinelFailover) Pool() *pool.ConnPool { + c.poolOnce.Do(func() { + c.opt.Dialer = c.dial + c.pool = newConnPool(c.opt) }) - return d.pool + return c.pool } -func (d *sentinelFailover) dial() (net.Conn, error) { - addr, err := d.MasterAddr() +func (c *sentinelFailover) dial() (net.Conn, error) { + addr, err := c.MasterAddr() if err != nil { return nil, err } - return net.DialTimeout("tcp", addr, d.opt.DialTimeout) + return net.DialTimeout("tcp", addr, c.opt.DialTimeout) } -func (d *sentinelFailover) MasterAddr() (string, error) { - d.mu.Lock() - defer d.mu.Unlock() - - addr, err := d.masterAddr() +func (c *sentinelFailover) MasterAddr() (string, error) { + addr, err := c.masterAddr() if err != nil { return "", err } - d._switchMaster(addr) - + c.switchMaster(addr) return addr, nil } -func (d *sentinelFailover) masterAddr() (string, error) { - // Try last working sentinel. - if d.sentinel != nil { - addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result() - if err == nil { - addr := net.JoinHostPort(addr[0], addr[1]) - return addr, nil - } - - internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", - d.masterName, err) - d._resetSentinel() +func (c *sentinelFailover) masterAddr() (string, error) { + addr := c.getMasterAddr() + if addr != "" { + return addr, nil } - for i, sentinelAddr := range d.sentinelAddrs { + c.mu.Lock() + defer c.mu.Unlock() + + for i, sentinelAddr := range c.sentinelAddrs { sentinel := NewSentinelClient(&Options{ Addr: sentinelAddr, - DialTimeout: d.opt.DialTimeout, - ReadTimeout: d.opt.ReadTimeout, - WriteTimeout: d.opt.WriteTimeout, + MaxRetries: c.opt.MaxRetries, + + DialTimeout: c.opt.DialTimeout, + ReadTimeout: c.opt.ReadTimeout, + WriteTimeout: c.opt.WriteTimeout, - PoolSize: d.opt.PoolSize, - PoolTimeout: d.opt.PoolTimeout, - IdleTimeout: d.opt.IdleTimeout, + PoolSize: c.opt.PoolSize, + PoolTimeout: c.opt.PoolTimeout, + IdleTimeout: c.opt.IdleTimeout, + IdleCheckFrequency: c.opt.IdleCheckFrequency, + + TLSConfig: c.opt.TLSConfig, }) - masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result() + masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result() if err != nil { internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", - d.masterName, err) - sentinel.Close() + c.masterName, err) + _ = sentinel.Close() continue } // Push working sentinel to the top. - d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0] - d.setSentinel(sentinel) + c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0] + c.setSentinel(sentinel) addr := net.JoinHostPort(masterAddr[0], masterAddr[1]) return addr, nil @@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) { return "", errors.New("redis: all sentinels are unreachable") } -func (c *sentinelFailover) switchMaster(addr string) { - c.mu.Lock() - c._switchMaster(addr) - c.mu.Unlock() +func (c *sentinelFailover) getMasterAddr() string { + c.mu.RLock() + sentinel := c.sentinel + c.mu.RUnlock() + + if sentinel == nil { + return "" + } + + addr, err := sentinel.GetMasterAddrByName(c.masterName).Result() + if err != nil { + internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s", + c.masterName, err) + c.mu.Lock() + if c.sentinel == sentinel { + c.closeSentinel() + } + c.mu.Unlock() + return "" + } + + return net.JoinHostPort(addr[0], addr[1]) } -func (c *sentinelFailover) _switchMaster(addr string) { - if c._masterAddr == addr { +func (c *sentinelFailover) switchMaster(addr string) { + c.mu.RLock() + masterAddr := c._masterAddr + c.mu.RUnlock() + if masterAddr == addr { return } + c.mu.Lock() + defer c.mu.Unlock() + internal.Logf("sentinel: new master=%q addr=%q", c.masterName, addr) _ = c.Pool().Filter(func(cn *pool.Conn) bool { @@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) { c._masterAddr = addr } -func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) { - d.discoverSentinels(sentinel) - d.sentinel = sentinel - go d.listen(sentinel) +func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) { + c.discoverSentinels(sentinel) + c.sentinel = sentinel + + c.pubsub = sentinel.Subscribe("+switch-master") + go c.listen(c.pubsub) } -func (d *sentinelFailover) resetSentinel() error { - var err error - d.mu.Lock() - if d.sentinel != nil { - err = d._resetSentinel() +func (c *sentinelFailover) closeSentinel() error { + var firstErr error + + err := c.pubsub.Close() + if err != nil && firstErr == err { + firstErr = err } - d.mu.Unlock() - return err -} + c.pubsub = nil -func (d *sentinelFailover) _resetSentinel() error { - err := d.sentinel.Close() - d.sentinel = nil - return err + err = c.sentinel.Close() + if err != nil && firstErr == err { + firstErr = err + } + c.sentinel = nil + + return firstErr } -func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { - sentinels, err := sentinel.Sentinels(d.masterName).Result() +func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { + sentinels, err := sentinel.Sentinels(c.masterName).Result() if err != nil { - internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err) + internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err) return } for _, sentinel := range sentinels { @@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) { key := vals[i].(string) if key == "name" { sentinelAddr := vals[i+1].(string) - if !contains(d.sentinelAddrs, sentinelAddr) { - internal.Logf( - "sentinel: discovered new sentinel=%q for master=%q", - sentinelAddr, d.masterName, - ) - d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr) + if !contains(c.sentinelAddrs, sentinelAddr) { + internal.Logf("sentinel: discovered new sentinel=%q for master=%q", + sentinelAddr, c.masterName) + c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr) } } } } } -func (d *sentinelFailover) listen(sentinel *SentinelClient) { - pubsub := sentinel.PubSub() - defer pubsub.Close() - - err := pubsub.Subscribe("+switch-master") - if err != nil { - internal.Logf("sentinel: Subscribe failed: %s", err) - d.resetSentinel() - return - } - +func (c *sentinelFailover) listen(pubsub *PubSub) { + ch := pubsub.Channel() for { - msg, err := pubsub.ReceiveMessage() - if err != nil { - if err == pool.ErrClosed { - d.resetSentinel() - return - } - internal.Logf("sentinel: ReceiveMessage failed: %s", err) - continue + msg, ok := <-ch + if !ok { + break } switch msg.Channel { case "+switch-master": parts := strings.Split(msg.Payload, " ") - if parts[0] != d.masterName { + if parts[0] != c.masterName { internal.Logf("sentinel: ignore addr for master=%q", parts[0]) continue } addr := net.JoinHostPort(parts[3], parts[4]) - d.switchMaster(addr) + c.switchMaster(addr) } } } diff --git a/src/dma/vendor/github.com/go-redis/redis/tx.go b/src/dma/vendor/github.com/go-redis/redis/tx.go index 6a7da99d..fb3e6331 100644 --- a/src/dma/vendor/github.com/go-redis/redis/tx.go +++ b/src/dma/vendor/github.com/go-redis/redis/tx.go @@ -29,10 +29,10 @@ func (c *Client) newTx() *Tx { return &tx } -// Watch prepares a transcaction and marks the keys to be watched +// Watch prepares a transaction and marks the keys to be watched // for conditional execution if there are any keys. // -// The transaction is automatically closed when the fn exits. +// The transaction is automatically closed when fn exits. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error { tx := c.newTx() if len(keys) > 0 { diff --git a/src/dma/vendor/github.com/go-redis/redis/universal.go b/src/dma/vendor/github.com/go-redis/redis/universal.go index 9e30c81d..a6075624 100644 --- a/src/dma/vendor/github.com/go-redis/redis/universal.go +++ b/src/dma/vendor/github.com/go-redis/redis/universal.go @@ -12,35 +12,38 @@ type UniversalOptions struct { // of cluster/sentinel nodes. Addrs []string - // The sentinel master name. - // Only failover clients. - MasterName string - // Database to be selected after connecting to the server. // Only single-node and failover clients. DB int - // Only cluster clients. - - // Enables read only queries on slave nodes. - ReadOnly bool - - MaxRedirects int - RouteByLatency bool - - // Common options + // Common options. OnConnect func(*Conn) error - MaxRetries int Password string + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int + MinIdleConns int + MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration TLSConfig *tls.Config + + // Only cluster clients. + + MaxRedirects int + ReadOnly bool + RouteByLatency bool + RouteRandomly bool + + // The sentinel master name. + // Only failover clients. + MasterName string } func (o *UniversalOptions) cluster() *ClusterOptions { @@ -49,22 +52,31 @@ func (o *UniversalOptions) cluster() *ClusterOptions { } return &ClusterOptions{ - Addrs: o.Addrs, + Addrs: o.Addrs, + OnConnect: o.OnConnect, + + Password: o.Password, + MaxRedirects: o.MaxRedirects, - RouteByLatency: o.RouteByLatency, ReadOnly: o.ReadOnly, + RouteByLatency: o.RouteByLatency, + RouteRandomly: o.RouteRandomly, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, DialTimeout: o.DialTimeout, ReadTimeout: o.ReadTimeout, WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } @@ -76,19 +88,27 @@ func (o *UniversalOptions) failover() *FailoverOptions { return &FailoverOptions{ SentinelAddrs: o.Addrs, MasterName: o.MasterName, - DB: o.DB, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } @@ -99,20 +119,28 @@ func (o *UniversalOptions) simple() *Options { } return &Options{ - Addr: addr, - DB: o.DB, + Addr: addr, + OnConnect: o.OnConnect, + + DB: o.DB, + Password: o.Password, + + MaxRetries: o.MaxRetries, + MinRetryBackoff: o.MinRetryBackoff, + MaxRetryBackoff: o.MaxRetryBackoff, + + DialTimeout: o.DialTimeout, + ReadTimeout: o.ReadTimeout, + WriteTimeout: o.WriteTimeout, - OnConnect: o.OnConnect, - MaxRetries: o.MaxRetries, - Password: o.Password, - DialTimeout: o.DialTimeout, - ReadTimeout: o.ReadTimeout, - WriteTimeout: o.WriteTimeout, PoolSize: o.PoolSize, + MinIdleConns: o.MinIdleConns, + MaxConnAge: o.MaxConnAge, PoolTimeout: o.PoolTimeout, IdleTimeout: o.IdleTimeout, IdleCheckFrequency: o.IdleCheckFrequency, - TLSConfig: o.TLSConfig, + + TLSConfig: o.TLSConfig, } } diff --git a/src/dma/vendor/github.com/labstack/echo/.travis.yml b/src/dma/vendor/github.com/labstack/echo/.travis.yml index 05e53b16..30346d7f 100644 --- a/src/dma/vendor/github.com/labstack/echo/.travis.yml +++ b/src/dma/vendor/github.com/labstack/echo/.travis.yml @@ -1,12 +1,14 @@ language: go go: - - 1.9.x - - 1.10.x + - 1.11.x - tip +env: + - GO111MODULE=on install: - - make dependency + - go get -v golang.org/x/lint/golint script: - - make test + - golint -set_exit_status ./... + - go test -race -coverprofile=coverage.txt -covermode=atomic ./... after_success: - bash <(curl -s https://codecov.io/bash) matrix: diff --git a/src/dma/vendor/github.com/labstack/echo/Gopkg.lock b/src/dma/vendor/github.com/labstack/echo/Gopkg.lock deleted file mode 100644 index f3c3b8d2..00000000 --- a/src/dma/vendor/github.com/labstack/echo/Gopkg.lock +++ /dev/null @@ -1,75 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - name = "github.com/davecgh/go-spew" - packages = ["spew"] - revision = "346938d642f2ec3594ed81d874461961cd0faa76" - version = "v1.1.0" - -[[projects]] - name = "github.com/dgrijalva/jwt-go" - packages = ["."] - revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" - version = "v3.2.0" - -[[projects]] - name = "github.com/labstack/gommon" - packages = ["bytes","color","log","random"] - revision = "6fe1405d73ec4bd4cd8a4ac8e2a2b2bf95d03954" - version = "0.2.4" - -[[projects]] - name = "github.com/mattn/go-colorable" - packages = ["."] - revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072" - version = "v0.0.9" - -[[projects]] - name = "github.com/mattn/go-isatty" - packages = ["."] - revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39" - version = "v0.0.3" - -[[projects]] - name = "github.com/pmezard/go-difflib" - packages = ["difflib"] - revision = "792786c7400a136282c1664665ae0a8db921c6c2" - version = "v1.0.0" - -[[projects]] - name = "github.com/stretchr/testify" - packages = ["assert"] - revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" - version = "v1.2.1" - -[[projects]] - branch = "master" - name = "github.com/valyala/bytebufferpool" - packages = ["."] - revision = "e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7" - -[[projects]] - branch = "master" - name = "github.com/valyala/fasttemplate" - packages = ["."] - revision = "dcecefd839c4193db0d35b88ec65b4c12d360ab0" - -[[projects]] - branch = "master" - name = "golang.org/x/crypto" - packages = ["acme","acme/autocert"] - revision = "182114d582623c1caa54f73de9c7224e23a48487" - -[[projects]] - branch = "master" - name = "golang.org/x/sys" - packages = ["unix"] - revision = "c28acc882ebcbfbe8ce9f0f14b9ac26ee138dd51" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - inputs-digest = "9c7b45e80fe353405800cf01f429b3a203cfb8d4468a04c64a908e11a98ea764" - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/src/dma/vendor/github.com/labstack/echo/Gopkg.toml b/src/dma/vendor/github.com/labstack/echo/Gopkg.toml deleted file mode 100644 index 61de60cb..00000000 --- a/src/dma/vendor/github.com/labstack/echo/Gopkg.toml +++ /dev/null @@ -1,42 +0,0 @@ - -# Gopkg.toml example -# -# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" - - -[[constraint]] - name = "github.com/dgrijalva/jwt-go" - version = "3.2.0" - -[[constraint]] - name = "github.com/labstack/gommon" - version = "0.2.4" - -[[constraint]] - name = "github.com/stretchr/testify" - version = "1.2.1" - -[[constraint]] - branch = "master" - name = "github.com/valyala/fasttemplate" - -[[constraint]] - branch = "master" - name = "golang.org/x/crypto" \ No newline at end of file diff --git a/src/dma/vendor/github.com/labstack/echo/Makefile b/src/dma/vendor/github.com/labstack/echo/Makefile index 494667d8..dfcb6c02 100644 --- a/src/dma/vendor/github.com/labstack/echo/Makefile +++ b/src/dma/vendor/github.com/labstack/echo/Makefile @@ -1,17 +1,3 @@ -DEP_VERSION=0.4.1 - -dependency: - curl -fsSL -o ${GOPATH}/bin/dep https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 - chmod +x ${GOPATH}/bin/dep - dep ensure - -test: - echo "" > coverage.txt - for d in $(shell go list ./... | grep -v vendor); do \ - go test -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \ - [ -f profile.out ] && cat profile.out >> coverage.txt && rm profile.out; \ - done - tag: @git tag `grep -P '^\tversion = ' echo.go|cut -f2 -d'"'` @git tag|grep -v ^v diff --git a/src/dma/vendor/github.com/labstack/echo/README.md b/src/dma/vendor/github.com/labstack/echo/README.md index 0f609df4..49e4d3b1 100644 --- a/src/dma/vendor/github.com/labstack/echo/README.md +++ b/src/dma/vendor/github.com/labstack/echo/README.md @@ -32,7 +32,7 @@ Date: 2018/03/15
Source: https://github.com/vishr/web-framework-benchmark
Lower is better! - + ## [Guide](https://echo.labstack.com/guide) diff --git a/src/dma/vendor/github.com/labstack/echo/bind.go b/src/dma/vendor/github.com/labstack/echo/bind.go index 38e07150..4998e25b 100644 --- a/src/dma/vendor/github.com/labstack/echo/bind.go +++ b/src/dma/vendor/github.com/labstack/echo/bind.go @@ -31,9 +31,9 @@ type ( func (b *DefaultBinder) Bind(i interface{}, c Context) (err error) { req := c.Request() if req.ContentLength == 0 { - if req.Method == GET || req.Method == DELETE { + if req.Method == http.MethodGet || req.Method == http.MethodDelete { if err = b.bindData(i, c.QueryParams(), "query"); err != nil { - return NewHTTPError(http.StatusBadRequest, err.Error()) + return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err) } return } @@ -44,30 +44,32 @@ func (b *DefaultBinder) Bind(i interface{}, c Context) (err error) { case strings.HasPrefix(ctype, MIMEApplicationJSON): if err = json.NewDecoder(req.Body).Decode(i); err != nil { if ute, ok := err.(*json.UnmarshalTypeError); ok { - return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unmarshal type error: expected=%v, got=%v, offset=%v", ute.Type, ute.Value, ute.Offset)) + return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unmarshal type error: expected=%v, got=%v, field=%v, offset=%v", ute.Type, ute.Value, ute.Field, ute.Offset)).SetInternal(err) } else if se, ok := err.(*json.SyntaxError); ok { - return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: offset=%v, error=%v", se.Offset, se.Error())) + return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: offset=%v, error=%v", se.Offset, se.Error())).SetInternal(err) } else { - return NewHTTPError(http.StatusBadRequest, err.Error()) + return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err) } + return NewHTTPError(http.StatusBadRequest, err.Error()) } case strings.HasPrefix(ctype, MIMEApplicationXML), strings.HasPrefix(ctype, MIMETextXML): if err = xml.NewDecoder(req.Body).Decode(i); err != nil { if ute, ok := err.(*xml.UnsupportedTypeError); ok { - return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unsupported type error: type=%v, error=%v", ute.Type, ute.Error())) + return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Unsupported type error: type=%v, error=%v", ute.Type, ute.Error())).SetInternal(err) } else if se, ok := err.(*xml.SyntaxError); ok { - return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: line=%v, error=%v", se.Line, se.Error())) + return NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Syntax error: line=%v, error=%v", se.Line, se.Error())).SetInternal(err) } else { - return NewHTTPError(http.StatusBadRequest, err.Error()) + return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err) } + return NewHTTPError(http.StatusBadRequest, err.Error()) } case strings.HasPrefix(ctype, MIMEApplicationForm), strings.HasPrefix(ctype, MIMEMultipartForm): params, err := c.FormParams() if err != nil { - return NewHTTPError(http.StatusBadRequest, err.Error()) + return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err) } if err = b.bindData(i, params, "form"); err != nil { - return NewHTTPError(http.StatusBadRequest, err.Error()) + return NewHTTPError(http.StatusBadRequest, err.Error()).SetInternal(err) } default: return ErrUnsupportedMediaType @@ -96,14 +98,29 @@ func (b *DefaultBinder) bindData(ptr interface{}, data map[string][]string, tag inputFieldName = typeField.Name // If tag is nil, we inspect if the field is a struct. if _, ok := bindUnmarshaler(structField); !ok && structFieldKind == reflect.Struct { - err := b.bindData(structField.Addr().Interface(), data, tag) - if err != nil { + if err := b.bindData(structField.Addr().Interface(), data, tag); err != nil { return err } continue } } + inputValue, exists := data[inputFieldName] + if !exists { + // Go json.Unmarshal supports case insensitive binding. However the + // url params are bound case sensitive which is inconsistent. To + // fix this we must check all of the map values in a + // case-insensitive search. + inputFieldName = strings.ToLower(inputFieldName) + for k, v := range data { + if strings.ToLower(k) == inputFieldName { + inputValue = v + exists = true + break + } + } + } + if !exists { continue } @@ -126,10 +143,9 @@ func (b *DefaultBinder) bindData(ptr interface{}, data map[string][]string, tag } } val.Field(i).Set(slice) - } else { - if err := setWithProperType(typeField.Type.Kind(), inputValue[0], structField); err != nil { - return err - } + } else if err := setWithProperType(typeField.Type.Kind(), inputValue[0], structField); err != nil { + return err + } } return nil diff --git a/src/dma/vendor/github.com/labstack/echo/context.go b/src/dma/vendor/github.com/labstack/echo/context.go index cf780c51..d4722700 100644 --- a/src/dma/vendor/github.com/labstack/echo/context.go +++ b/src/dma/vendor/github.com/labstack/echo/context.go @@ -204,6 +204,7 @@ type ( const ( defaultMemory = 32 << 20 // 32 MB indexPage = "index.html" + defaultIndent = " " ) func (c *context) writeContentType(value string) { @@ -256,14 +257,13 @@ func (c *context) Scheme() string { } func (c *context) RealIP() string { - ra := c.request.RemoteAddr if ip := c.request.Header.Get(HeaderXForwardedFor); ip != "" { - ra = strings.Split(ip, ", ")[0] - } else if ip := c.request.Header.Get(HeaderXRealIP); ip != "" { - ra = ip - } else { - ra, _, _ = net.SplitHostPort(ra) + return strings.Split(ip, ", ")[0] + } + if ip := c.request.Header.Get(HeaderXRealIP); ip != "" { + return ip } + ra, _, _ := net.SplitHostPort(c.request.RemoteAddr) return ra } @@ -404,24 +404,46 @@ func (c *context) String(code int, s string) (err error) { return c.Blob(code, MIMETextPlainCharsetUTF8, []byte(s)) } -func (c *context) JSON(code int, i interface{}) (err error) { +func (c *context) jsonPBlob(code int, callback string, i interface{}) (err error) { + enc := json.NewEncoder(c.response) _, pretty := c.QueryParams()["pretty"] if c.echo.Debug || pretty { - return c.JSONPretty(code, i, " ") + enc.SetIndent("", " ") } - b, err := json.Marshal(i) - if err != nil { + c.writeContentType(MIMEApplicationJavaScriptCharsetUTF8) + c.response.WriteHeader(code) + if _, err = c.response.Write([]byte(callback + "(")); err != nil { + return + } + if err = enc.Encode(i); err != nil { + return + } + if _, err = c.response.Write([]byte(");")); err != nil { return } - return c.JSONBlob(code, b) + return } -func (c *context) JSONPretty(code int, i interface{}, indent string) (err error) { - b, err := json.MarshalIndent(i, "", indent) - if err != nil { - return +func (c *context) json(code int, i interface{}, indent string) error { + enc := json.NewEncoder(c.response) + if indent != "" { + enc.SetIndent("", indent) + } + c.writeContentType(MIMEApplicationJSONCharsetUTF8) + c.response.WriteHeader(code) + return enc.Encode(i) +} + +func (c *context) JSON(code int, i interface{}) (err error) { + indent := "" + if _, pretty := c.QueryParams()["pretty"]; c.echo.Debug || pretty { + indent = defaultIndent } - return c.JSONBlob(code, b) + return c.json(code, i, indent) +} + +func (c *context) JSONPretty(code int, i interface{}, indent string) (err error) { + return c.json(code, i, indent) } func (c *context) JSONBlob(code int, b []byte) (err error) { @@ -429,11 +451,7 @@ func (c *context) JSONBlob(code int, b []byte) (err error) { } func (c *context) JSONP(code int, callback string, i interface{}) (err error) { - b, err := json.Marshal(i) - if err != nil { - return - } - return c.JSONPBlob(code, callback, b) + return c.jsonPBlob(code, callback, i) } func (c *context) JSONPBlob(code int, callback string, b []byte) (err error) { @@ -449,24 +467,29 @@ func (c *context) JSONPBlob(code int, callback string, b []byte) (err error) { return } -func (c *context) XML(code int, i interface{}) (err error) { - _, pretty := c.QueryParams()["pretty"] - if c.echo.Debug || pretty { - return c.XMLPretty(code, i, " ") +func (c *context) xml(code int, i interface{}, indent string) (err error) { + c.writeContentType(MIMEApplicationXMLCharsetUTF8) + c.response.WriteHeader(code) + enc := xml.NewEncoder(c.response) + if indent != "" { + enc.Indent("", indent) } - b, err := xml.Marshal(i) - if err != nil { + if _, err = c.response.Write([]byte(xml.Header)); err != nil { return } - return c.XMLBlob(code, b) + return enc.Encode(i) } -func (c *context) XMLPretty(code int, i interface{}, indent string) (err error) { - b, err := xml.MarshalIndent(i, "", indent) - if err != nil { - return +func (c *context) XML(code int, i interface{}) (err error) { + indent := "" + if _, pretty := c.QueryParams()["pretty"]; c.echo.Debug || pretty { + indent = defaultIndent } - return c.XMLBlob(code, b) + return c.xml(code, i, indent) +} + +func (c *context) XMLPretty(code int, i interface{}, indent string) (err error) { + return c.xml(code, i, indent) } func (c *context) XMLBlob(code int, b []byte) (err error) { @@ -574,3 +597,4 @@ func (c *context) Reset(r *http.Request, w http.ResponseWriter) { // NOTE: Don't reset because it has to have length c.echo.maxParam at all times // c.pvalues = nil } + diff --git a/src/dma/vendor/github.com/labstack/echo/echo.go b/src/dma/vendor/github.com/labstack/echo/echo.go index 41ac6b5e..98286515 100644 --- a/src/dma/vendor/github.com/labstack/echo/echo.go +++ b/src/dma/vendor/github.com/labstack/echo/echo.go @@ -62,7 +62,7 @@ import ( type ( // Echo is the top-level framework instance. Echo struct { - stdLogger *stdLog.Logger + StdLogger *stdLog.Logger colorer *color.Color premiddleware []MiddlewareFunc middleware []MiddlewareFunc @@ -103,7 +103,7 @@ type ( // MiddlewareFunc defines a function to process middleware. MiddlewareFunc func(HandlerFunc) HandlerFunc - // HandlerFunc defines a function to server HTTP requests. + // HandlerFunc defines a function to serve HTTP requests. HandlerFunc func(Context) error // HTTPErrorHandler is a centralized HTTP error handler. @@ -129,17 +129,18 @@ type ( ) // HTTP methods +// NOTE: Deprecated, please use the stdlib constants directly instead. const ( - CONNECT = "CONNECT" - DELETE = "DELETE" - GET = "GET" - HEAD = "HEAD" - OPTIONS = "OPTIONS" - PATCH = "PATCH" - POST = "POST" - PROPFIND = "PROPFIND" - PUT = "PUT" - TRACE = "TRACE" + CONNECT = http.MethodConnect + DELETE = http.MethodDelete + GET = http.MethodGet + HEAD = http.MethodHead + OPTIONS = http.MethodOptions + PATCH = http.MethodPatch + POST = http.MethodPost + // PROPFIND = "PROPFIND" + PUT = http.MethodPut + TRACE = http.MethodTrace ) // MIME types @@ -165,6 +166,8 @@ const ( const ( charsetUTF8 = "charset=UTF-8" + // PROPFIND Method can be used on collection and property resources. + PROPFIND = "PROPFIND" ) // Headers @@ -217,7 +220,8 @@ const ( ) const ( - Version = "3.3.5" + // Version of Echo + Version = "3.3.10-dev" website = "https://echo.labstack.com" // http://patorjk.com/software/taag/#p=display&f=Small%20Slant&t=Echo banner = ` @@ -234,16 +238,16 @@ ____________________________________O/_______ var ( methods = [...]string{ - CONNECT, - DELETE, - GET, - HEAD, - OPTIONS, - PATCH, - POST, + http.MethodConnect, + http.MethodDelete, + http.MethodGet, + http.MethodHead, + http.MethodOptions, + http.MethodPatch, + http.MethodPost, PROPFIND, - PUT, - TRACE, + http.MethodPut, + http.MethodTrace, } ) @@ -255,6 +259,12 @@ var ( ErrForbidden = NewHTTPError(http.StatusForbidden) ErrMethodNotAllowed = NewHTTPError(http.StatusMethodNotAllowed) ErrStatusRequestEntityTooLarge = NewHTTPError(http.StatusRequestEntityTooLarge) + ErrTooManyRequests = NewHTTPError(http.StatusTooManyRequests) + ErrBadRequest = NewHTTPError(http.StatusBadRequest) + ErrBadGateway = NewHTTPError(http.StatusBadGateway) + ErrInternalServerError = NewHTTPError(http.StatusInternalServerError) + ErrRequestTimeout = NewHTTPError(http.StatusRequestTimeout) + ErrServiceUnavailable = NewHTTPError(http.StatusServiceUnavailable) ErrValidatorNotRegistered = errors.New("validator not registered") ErrRendererNotRegistered = errors.New("renderer not registered") ErrInvalidRedirectCode = errors.New("invalid redirect status code") @@ -289,7 +299,7 @@ func New() (e *Echo) { e.HTTPErrorHandler = e.DefaultHTTPErrorHandler e.Binder = &DefaultBinder{} e.Logger.SetLevel(log.ERROR) - e.stdLogger = stdLog.New(e.Logger.Output(), e.Logger.Prefix()+": ", 0) + e.StdLogger = stdLog.New(e.Logger.Output(), e.Logger.Prefix()+": ", 0) e.pool.New = func() interface{} { return e.NewContext(nil, nil) } @@ -326,7 +336,7 @@ func (e *Echo) DefaultHTTPErrorHandler(err error, c Context) { code = he.Code msg = he.Message if he.Internal != nil { - msg = fmt.Sprintf("%v, %v", err, he.Internal) + err = fmt.Errorf("%v, %v", err, he.Internal) } } else if e.Debug { msg = err.Error() @@ -337,11 +347,9 @@ func (e *Echo) DefaultHTTPErrorHandler(err error, c Context) { msg = Map{"message": msg} } - e.Logger.Error(err) - // Send response if !c.Response().Committed { - if c.Request().Method == HEAD { // Issue #608 + if c.Request().Method == http.MethodHead { // Issue #608 err = c.NoContent(code) } else { err = c.JSON(code, msg) @@ -365,55 +373,55 @@ func (e *Echo) Use(middleware ...MiddlewareFunc) { // CONNECT registers a new CONNECT route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) CONNECT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(CONNECT, path, h, m...) + return e.Add(http.MethodConnect, path, h, m...) } // DELETE registers a new DELETE route for a path with matching handler in the router // with optional route-level middleware. func (e *Echo) DELETE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(DELETE, path, h, m...) + return e.Add(http.MethodDelete, path, h, m...) } // GET registers a new GET route for a path with matching handler in the router // with optional route-level middleware. func (e *Echo) GET(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(GET, path, h, m...) + return e.Add(http.MethodGet, path, h, m...) } // HEAD registers a new HEAD route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) HEAD(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(HEAD, path, h, m...) + return e.Add(http.MethodHead, path, h, m...) } // OPTIONS registers a new OPTIONS route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) OPTIONS(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(OPTIONS, path, h, m...) + return e.Add(http.MethodOptions, path, h, m...) } // PATCH registers a new PATCH route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) PATCH(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(PATCH, path, h, m...) + return e.Add(http.MethodPatch, path, h, m...) } // POST registers a new POST route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) POST(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(POST, path, h, m...) + return e.Add(http.MethodPost, path, h, m...) } // PUT registers a new PUT route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) PUT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(PUT, path, h, m...) + return e.Add(http.MethodPut, path, h, m...) } // TRACE registers a new TRACE route for a path with matching handler in the // router with optional route-level middleware. func (e *Echo) TRACE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return e.Add(TRACE, path, h, m...) + return e.Add(http.MethodTrace, path, h, m...) } // Any registers a new route for all HTTP methods and path with matching handler @@ -462,11 +470,11 @@ func static(i i, prefix, root string) *Route { return i.GET(prefix+"/*", h) } -// File registers a new route with path to serve a static file. -func (e *Echo) File(path, file string) *Route { +// File registers a new route with path to serve a static file with optional route-level middleware. +func (e *Echo) File(path, file string, m ...MiddlewareFunc) *Route { return e.GET(path, func(c Context) error { return c.File(file) - }) + }, m...) } // Add registers a new route for an HTTP method and path with matching handler @@ -559,26 +567,17 @@ func (e *Echo) ServeHTTP(w http.ResponseWriter, r *http.Request) { c := e.pool.Get().(*context) c.Reset(r, w) - m := r.Method h := NotFoundHandler if e.premiddleware == nil { - path := r.URL.RawPath - if path == "" { - path = r.URL.Path - } - e.router.Find(m, getPath(r), c) + e.router.Find(r.Method, getPath(r), c) h = c.Handler() for i := len(e.middleware) - 1; i >= 0; i-- { h = e.middleware[i](h) } } else { h = func(c Context) error { - path := r.URL.RawPath - if path == "" { - path = r.URL.Path - } - e.router.Find(m, getPath(r), c) + e.router.Find(r.Method, getPath(r), c) h := c.Handler() for i := len(e.middleware) - 1; i >= 0; i-- { h = e.middleware[i](h) @@ -622,10 +621,6 @@ func (e *Echo) StartTLS(address string, certFile, keyFile string) (err error) { // StartAutoTLS starts an HTTPS server using certificates automatically installed from https://letsencrypt.org. func (e *Echo) StartAutoTLS(address string) error { - if e.Listener == nil { - go http.ListenAndServe(":http", e.AutoTLSManager.HTTPHandler(nil)) - } - s := e.TLSServer s.TLSConfig = new(tls.Config) s.TLSConfig.GetCertificate = e.AutoTLSManager.GetCertificate @@ -645,7 +640,7 @@ func (e *Echo) startTLS(address string) error { func (e *Echo) StartServer(s *http.Server) (err error) { // Setup e.colorer.SetOutput(e.Logger.Output()) - s.ErrorLog = e.stdLogger + s.ErrorLog = e.StdLogger s.Handler = e if e.Debug { e.Logger.SetLevel(log.DEBUG) @@ -689,7 +684,7 @@ func (e *Echo) Close() error { return e.Server.Close() } -// Shutdown stops server the gracefully. +// Shutdown stops the server gracefully. // It internally calls `http.Server#Shutdown()`. func (e *Echo) Shutdown(ctx stdContext.Context) error { if err := e.TLSServer.Shutdown(ctx); err != nil { @@ -712,6 +707,12 @@ func (he *HTTPError) Error() string { return fmt.Sprintf("code=%d, message=%v", he.Code, he.Message) } +// SetInternal sets error to HTTPError.Internal +func (he *HTTPError) SetInternal(err error) *HTTPError { + he.Internal = err + return he +} + // WrapHandler wraps `http.Handler` into `echo.HandlerFunc`. func WrapHandler(h http.Handler) HandlerFunc { return func(c Context) error { diff --git a/src/dma/vendor/github.com/labstack/echo/group.go b/src/dma/vendor/github.com/labstack/echo/group.go index 5257e83c..3e3732b6 100644 --- a/src/dma/vendor/github.com/labstack/echo/group.go +++ b/src/dma/vendor/github.com/labstack/echo/group.go @@ -1,6 +1,7 @@ package echo import ( + "net/http" "path" ) @@ -29,47 +30,47 @@ func (g *Group) Use(middleware ...MiddlewareFunc) { // CONNECT implements `Echo#CONNECT()` for sub-routes within the Group. func (g *Group) CONNECT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(CONNECT, path, h, m...) + return g.Add(http.MethodConnect, path, h, m...) } // DELETE implements `Echo#DELETE()` for sub-routes within the Group. func (g *Group) DELETE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(DELETE, path, h, m...) + return g.Add(http.MethodDelete, path, h, m...) } // GET implements `Echo#GET()` for sub-routes within the Group. func (g *Group) GET(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(GET, path, h, m...) + return g.Add(http.MethodGet, path, h, m...) } // HEAD implements `Echo#HEAD()` for sub-routes within the Group. func (g *Group) HEAD(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(HEAD, path, h, m...) + return g.Add(http.MethodHead, path, h, m...) } // OPTIONS implements `Echo#OPTIONS()` for sub-routes within the Group. func (g *Group) OPTIONS(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(OPTIONS, path, h, m...) + return g.Add(http.MethodOptions, path, h, m...) } // PATCH implements `Echo#PATCH()` for sub-routes within the Group. func (g *Group) PATCH(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(PATCH, path, h, m...) + return g.Add(http.MethodPatch, path, h, m...) } // POST implements `Echo#POST()` for sub-routes within the Group. func (g *Group) POST(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(POST, path, h, m...) + return g.Add(http.MethodPost, path, h, m...) } // PUT implements `Echo#PUT()` for sub-routes within the Group. func (g *Group) PUT(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(PUT, path, h, m...) + return g.Add(http.MethodPut, path, h, m...) } // TRACE implements `Echo#TRACE()` for sub-routes within the Group. func (g *Group) TRACE(path string, h HandlerFunc, m ...MiddlewareFunc) *Route { - return g.Add(TRACE, path, h, m...) + return g.Add(http.MethodTrace, path, h, m...) } // Any implements `Echo#Any()` for sub-routes within the Group. diff --git a/src/dma/vendor/github.com/labstack/echo/log.go b/src/dma/vendor/github.com/labstack/echo/log.go index b194c39c..3f8de590 100644 --- a/src/dma/vendor/github.com/labstack/echo/log.go +++ b/src/dma/vendor/github.com/labstack/echo/log.go @@ -15,6 +15,7 @@ type ( SetPrefix(p string) Level() log.Lvl SetLevel(v log.Lvl) + SetHeader(h string) Print(i ...interface{}) Printf(format string, args ...interface{}) Printj(j log.JSON) diff --git a/src/dma/vendor/github.com/labstack/echo/router.go b/src/dma/vendor/github.com/labstack/echo/router.go index ff53da87..73f0b68b 100644 --- a/src/dma/vendor/github.com/labstack/echo/router.go +++ b/src/dma/vendor/github.com/labstack/echo/router.go @@ -1,5 +1,7 @@ package echo +import "net/http" + type ( // Router is the registry of all registered routes for an `Echo` instance for // request matching and URL path parameter parsing. @@ -79,7 +81,7 @@ func (r *Router) Add(method, path string, h HandlerFunc) { r.insert(method, path[:i], h, pkind, ppath, pnames) return } - r.insert(method, path[:i], nil, pkind, ppath, pnames) + r.insert(method, path[:i], nil, pkind, "", nil) } else if path[i] == '*' { r.insert(method, path[:i], nil, skind, "", nil) pnames = append(pnames, "*") @@ -226,50 +228,50 @@ func (n *node) findChildByKind(t kind) *node { func (n *node) addHandler(method string, h HandlerFunc) { switch method { - case CONNECT: + case http.MethodConnect: n.methodHandler.connect = h - case DELETE: + case http.MethodDelete: n.methodHandler.delete = h - case GET: + case http.MethodGet: n.methodHandler.get = h - case HEAD: + case http.MethodHead: n.methodHandler.head = h - case OPTIONS: + case http.MethodOptions: n.methodHandler.options = h - case PATCH: + case http.MethodPatch: n.methodHandler.patch = h - case POST: + case http.MethodPost: n.methodHandler.post = h case PROPFIND: n.methodHandler.propfind = h - case PUT: + case http.MethodPut: n.methodHandler.put = h - case TRACE: + case http.MethodTrace: n.methodHandler.trace = h } } func (n *node) findHandler(method string) HandlerFunc { switch method { - case CONNECT: + case http.MethodConnect: return n.methodHandler.connect - case DELETE: + case http.MethodDelete: return n.methodHandler.delete - case GET: + case http.MethodGet: return n.methodHandler.get - case HEAD: + case http.MethodHead: return n.methodHandler.head - case OPTIONS: + case http.MethodOptions: return n.methodHandler.options - case PATCH: + case http.MethodPatch: return n.methodHandler.patch - case POST: + case http.MethodPost: return n.methodHandler.post case PROPFIND: return n.methodHandler.propfind - case PUT: + case http.MethodPut: return n.methodHandler.put - case TRACE: + case http.MethodTrace: return n.methodHandler.trace default: return nil @@ -311,7 +313,7 @@ func (r *Router) Find(method, path string, c Context) { // Search order static > param > any for { if search == "" { - goto End + break } pl := 0 // Prefix length @@ -346,7 +348,7 @@ func (r *Router) Find(method, path string, c Context) { } if search == "" { - goto End + break } // Static node @@ -403,10 +405,9 @@ func (r *Router) Find(method, path string, c Context) { return } pvalues[len(cn.pnames)-1] = search - goto End + break } -End: ctx.handler = cn.findHandler(method) ctx.path = cn.ppath ctx.pnames = cn.pnames diff --git a/src/dma/vendor/github.com/labstack/gommon/LICENSE b/src/dma/vendor/github.com/labstack/gommon/LICENSE index d2ae3edf..fc718faf 100644 --- a/src/dma/vendor/github.com/labstack/gommon/LICENSE +++ b/src/dma/vendor/github.com/labstack/gommon/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015 labstack +Copyright (c) 2018 labstack Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/dma/vendor/github.com/labstack/gommon/log/log.go b/src/dma/vendor/github.com/labstack/gommon/log/log.go index 0d77a87a..132411db 100644 --- a/src/dma/vendor/github.com/labstack/gommon/log/log.go +++ b/src/dma/vendor/github.com/labstack/gommon/log/log.go @@ -219,7 +219,7 @@ func (l *Logger) Panic(i ...interface{}) { func (l *Logger) Panicf(format string, args ...interface{}) { l.log(panicLevel, format, args...) - panic(fmt.Sprintf(format, args)) + panic(fmt.Sprintf(format, args...)) } func (l *Logger) Panicj(j JSON) { diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go b/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go index 3a3ef5b7..8c3960cc 100644 --- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go +++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain.go @@ -593,6 +593,7 @@ const ( DOMAIN_SHUTOFF_SAVED = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_SAVED) DOMAIN_SHUTOFF_FAILED = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_FAILED) DOMAIN_SHUTOFF_FROM_SNAPSHOT = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_FROM_SNAPSHOT) + DOMAIN_SHUTOFF_DAEMON = DomainShutoffReason(C.VIR_DOMAIN_SHUTOFF_DAEMON) ) type DomainBlockCommitFlags int @@ -768,6 +769,7 @@ const ( DOMAIN_STATS_INTERFACE = DomainStatsTypes(C.VIR_DOMAIN_STATS_INTERFACE) DOMAIN_STATS_BLOCK = DomainStatsTypes(C.VIR_DOMAIN_STATS_BLOCK) DOMAIN_STATS_PERF = DomainStatsTypes(C.VIR_DOMAIN_STATS_PERF) + DOMAIN_STATS_IOTHREAD = DomainStatsTypes(C.VIR_DOMAIN_STATS_IOTHREAD) ) type DomainCoreDumpFlags int @@ -4206,6 +4208,57 @@ func (d *Domain) DelIOThread(id uint, flags DomainModificationImpact) error { return nil } +// See also https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainSetIOThreadParams + +type DomainSetIOThreadParams struct { + PollMaxNsSet bool + PollMaxNs uint64 + PollGrowSet bool + PollGrow uint + PollShrinkSet bool + PollShrink uint +} + +func getSetIOThreadParamsFieldInfo(params *DomainSetIOThreadParams) map[string]typedParamsFieldInfo { + return map[string]typedParamsFieldInfo{ + C.VIR_DOMAIN_IOTHREAD_POLL_MAX_NS: typedParamsFieldInfo{ + set: ¶ms.PollMaxNsSet, + ul: ¶ms.PollMaxNs, + }, + C.VIR_DOMAIN_IOTHREAD_POLL_GROW: typedParamsFieldInfo{ + set: ¶ms.PollGrowSet, + ui: ¶ms.PollGrow, + }, + C.VIR_DOMAIN_IOTHREAD_POLL_SHRINK: typedParamsFieldInfo{ + set: ¶ms.PollShrinkSet, + ui: ¶ms.PollShrink, + }, + } +} + +func (d *Domain) SetIOThreadParams(iothreadid uint, params *DomainSetIOThreadParams, flags DomainModificationImpact) error { + if C.LIBVIR_VERSION_NUMBER < 4010000 { + return makeNotImplementedError("virDomainSetIOThreadParams") + } + info := getSetIOThreadParamsFieldInfo(params) + + cparams, gerr := typedParamsPackNew(info) + if gerr != nil { + return gerr + } + nparams := len(*cparams) + + defer C.virTypedParamsClear((*C.virTypedParameter)(unsafe.Pointer(&(*cparams)[0])), C.int(nparams)) + + var err C.virError + ret := C.virDomainSetIOThreadParamsWrapper(d.ptr, C.uint(iothreadid), (*C.virTypedParameter)(unsafe.Pointer(&(*cparams)[0])), C.int(nparams), C.uint(flags), &err) + if ret == -1 { + return makeError(&err) + } + + return nil +} + // See also https://libvirt.org/html/libvirt-libvirt-domain.html#virDomainGetEmulatorPinInfo func (d *Domain) GetEmulatorPinInfo(flags DomainModificationImpact) ([]bool, error) { var cnodeinfo C.virNodeInfo diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h index d20631fc..19a3e24e 100644 --- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h +++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_compat.h @@ -895,7 +895,7 @@ struct _virDomainInterface { #endif #ifndef VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP -#define VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP 1 +#define VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_ARP 2 #endif /* 4.5.0 */ @@ -911,4 +911,26 @@ struct _virDomainInterface { #define VIR_DOMAIN_MEMORY_STAT_DISK_CACHES 10 #endif +/* 4.10.0 */ + +#ifndef VIR_DOMAIN_SHUTOFF_DAEMON +#define VIR_DOMAIN_SHUTOFF_DAEMON 8 +#endif + +#ifndef VIR_DOMAIN_STATS_IOTHREAD +#define VIR_DOMAIN_STATS_IOTHREAD (1 << 7) +#endif + +#ifndef VIR_DOMAIN_IOTHREAD_POLL_GROW +#define VIR_DOMAIN_IOTHREAD_POLL_GROW "poll_grow" +#endif + +#ifndef VIR_DOMAIN_IOTHREAD_POLL_SHRINK +#define VIR_DOMAIN_IOTHREAD_POLL_SHRINK "poll_shrink" +#endif + +#ifndef VIR_DOMAIN_IOTHREAD_POLL_MAX_NS +#define VIR_DOMAIN_IOTHREAD_POLL_MAX_NS "poll_max_ns" +#endif + #endif /* LIBVIRT_GO_DOMAIN_COMPAT_H__ */ diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go index b42dd42f..f674bd51 100644 --- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go +++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.go @@ -1913,6 +1913,26 @@ virDomainSetGuestVcpusWrapper(virDomainPtr domain, } +int +virDomainSetIOThreadParamsWrapper(virDomainPtr domain, + unsigned int iothread_id, + virTypedParameterPtr params, + int nparams, + unsigned int flags, + virErrorPtr err) +{ +#if LIBVIR_VERSION_NUMBER < 4010000 + assert(0); // Caller should have checked version +#else + int ret = virDomainSetIOThreadParams(domain, iothread_id, params, nparams, flags); + if (ret < 0) { + virCopyLastError(err); + } + return ret; +#endif +} + + int virDomainSetInterfaceParametersWrapper(virDomainPtr domain, const char *device, diff --git a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h index 7bd82826..48a4cd39 100644 --- a/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h +++ b/src/dma/vendor/github.com/libvirt/libvirt-go/domain_wrapper.h @@ -813,6 +813,14 @@ virDomainSetGuestVcpusWrapper(virDomainPtr domain, unsigned int flags, virErrorPtr err); +int +virDomainSetIOThreadParamsWrapper(virDomainPtr domain, + unsigned int iothread_id, + virTypedParameterPtr params, + int nparams, + unsigned int flags, + virErrorPtr err); + int virDomainSetInterfaceParametersWrapper(virDomainPtr domain, const char *device, diff --git a/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go b/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go index e17a5474..404e10ca 100644 --- a/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go +++ b/src/dma/vendor/github.com/mattn/go-colorable/colorable_windows.go @@ -29,6 +29,15 @@ const ( backgroundMask = (backgroundRed | backgroundBlue | backgroundGreen | backgroundIntensity) ) +const ( + genericRead = 0x80000000 + genericWrite = 0x40000000 +) + +const ( + consoleTextmodeBuffer = 0x1 +) + type wchar uint16 type short int16 type dword uint32 @@ -69,14 +78,17 @@ var ( procGetConsoleCursorInfo = kernel32.NewProc("GetConsoleCursorInfo") procSetConsoleCursorInfo = kernel32.NewProc("SetConsoleCursorInfo") procSetConsoleTitle = kernel32.NewProc("SetConsoleTitleW") + procCreateConsoleScreenBuffer = kernel32.NewProc("CreateConsoleScreenBuffer") ) // Writer provide colorable Writer to the console type Writer struct { - out io.Writer - handle syscall.Handle - oldattr word - oldpos coord + out io.Writer + handle syscall.Handle + althandle syscall.Handle + oldattr word + oldpos coord + rest bytes.Buffer } // NewColorable return new instance of Writer which handle escape sequence from File. @@ -407,7 +419,18 @@ func (w *Writer) Write(data []byte) (n int, err error) { var csbi consoleScreenBufferInfo procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) - er := bytes.NewReader(data) + handle := w.handle + + var er *bytes.Reader + if w.rest.Len() > 0 { + var rest bytes.Buffer + w.rest.WriteTo(&rest) + w.rest.Reset() + rest.Write(data) + er = bytes.NewReader(rest.Bytes()) + } else { + er = bytes.NewReader(data) + } var bw [1]byte loop: for { @@ -425,29 +448,55 @@ loop: break loop } - if c2 == ']' { - if err := doTitleSequence(er); err != nil { + switch c2 { + case '>': + continue + case ']': + w.rest.WriteByte(c1) + w.rest.WriteByte(c2) + er.WriteTo(&w.rest) + if bytes.IndexByte(w.rest.Bytes(), 0x07) == -1 { break loop } + er = bytes.NewReader(w.rest.Bytes()[2:]) + err := doTitleSequence(er) + if err != nil { + break loop + } + w.rest.Reset() continue - } - if c2 != 0x5b { + // https://github.com/mattn/go-colorable/issues/27 + case '7': + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) + w.oldpos = csbi.cursorPosition + continue + case '8': + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&w.oldpos))) + continue + case 0x5b: + // execute part after switch + default: continue } + w.rest.WriteByte(c1) + w.rest.WriteByte(c2) + er.WriteTo(&w.rest) + var buf bytes.Buffer var m byte - for { - c, err := er.ReadByte() - if err != nil { - break loop - } + for i, c := range w.rest.Bytes()[2:] { if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') || c == '@' { m = c + er = bytes.NewReader(w.rest.Bytes()[2+i+1:]) + w.rest.Reset() break } buf.Write([]byte(string(c))) } + if m == 0 { + break loop + } switch m { case 'A': @@ -455,61 +504,64 @@ loop: if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.y -= short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'B': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.y += short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'C': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.x += short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'D': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.x -= short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + if csbi.cursorPosition.x < 0 { + csbi.cursorPosition.x = 0 + } + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'E': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.x = 0 csbi.cursorPosition.y += short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'F': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.x = 0 csbi.cursorPosition.y -= short(n) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'G': n, err = strconv.Atoi(buf.String()) if err != nil { continue } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) csbi.cursorPosition.x = short(n - 1) - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'H', 'f': - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) if buf.Len() > 0 { token := strings.Split(buf.String(), ";") switch len(token) { @@ -534,7 +586,7 @@ loop: } else { csbi.cursorPosition.y = 0 } - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&csbi.cursorPosition))) case 'J': n := 0 if buf.Len() > 0 { @@ -545,20 +597,20 @@ loop: } var count, written dword var cursor coord - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) switch n { case 0: cursor = coord{x: csbi.cursorPosition.x, y: csbi.cursorPosition.y} - count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.size.y-csbi.cursorPosition.y)*csbi.size.x) + count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.size.y-csbi.cursorPosition.y)*dword(csbi.size.x) case 1: cursor = coord{x: csbi.window.left, y: csbi.window.top} - count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.window.top-csbi.cursorPosition.y)*csbi.size.x) + count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.window.top-csbi.cursorPosition.y)*dword(csbi.size.x) case 2: cursor = coord{x: csbi.window.left, y: csbi.window.top} - count = dword(csbi.size.x - csbi.cursorPosition.x + (csbi.size.y-csbi.cursorPosition.y)*csbi.size.x) + count = dword(csbi.size.x) - dword(csbi.cursorPosition.x) + dword(csbi.size.y-csbi.cursorPosition.y)*dword(csbi.size.x) } - procFillConsoleOutputCharacter.Call(uintptr(w.handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) - procFillConsoleOutputAttribute.Call(uintptr(w.handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) + procFillConsoleOutputCharacter.Call(uintptr(handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) + procFillConsoleOutputAttribute.Call(uintptr(handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) case 'K': n := 0 if buf.Len() > 0 { @@ -567,28 +619,28 @@ loop: continue } } - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) var cursor coord var count, written dword switch n { case 0: - cursor = coord{x: csbi.cursorPosition.x + 1, y: csbi.cursorPosition.y} - count = dword(csbi.size.x - csbi.cursorPosition.x - 1) + cursor = coord{x: csbi.cursorPosition.x, y: csbi.cursorPosition.y} + count = dword(csbi.size.x - csbi.cursorPosition.x) case 1: - cursor = coord{x: csbi.window.left, y: csbi.window.top + csbi.cursorPosition.y} + cursor = coord{x: csbi.window.left, y: csbi.cursorPosition.y} count = dword(csbi.size.x - csbi.cursorPosition.x) case 2: - cursor = coord{x: csbi.window.left, y: csbi.window.top + csbi.cursorPosition.y} + cursor = coord{x: csbi.window.left, y: csbi.cursorPosition.y} count = dword(csbi.size.x) } - procFillConsoleOutputCharacter.Call(uintptr(w.handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) - procFillConsoleOutputAttribute.Call(uintptr(w.handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) + procFillConsoleOutputCharacter.Call(uintptr(handle), uintptr(' '), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) + procFillConsoleOutputAttribute.Call(uintptr(handle), uintptr(csbi.attributes), uintptr(count), *(*uintptr)(unsafe.Pointer(&cursor)), uintptr(unsafe.Pointer(&written))) case 'm': - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) attr := csbi.attributes cs := buf.String() if cs == "" { - procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(w.oldattr)) + procSetConsoleTextAttribute.Call(uintptr(handle), uintptr(w.oldattr)) continue } token := strings.Split(cs, ";") @@ -627,6 +679,21 @@ loop: attr |= n256foreAttr[n256] i += 2 } + } else if len(token) == 5 && token[i+1] == "2" { + var r, g, b int + r, _ = strconv.Atoi(token[i+2]) + g, _ = strconv.Atoi(token[i+3]) + b, _ = strconv.Atoi(token[i+4]) + i += 4 + if r > 127 { + attr |= foregroundRed + } + if g > 127 { + attr |= foregroundGreen + } + if b > 127 { + attr |= foregroundBlue + } } else { attr = attr & (w.oldattr & backgroundMask) } @@ -654,6 +721,21 @@ loop: attr |= n256backAttr[n256] i += 2 } + } else if len(token) == 5 && token[i+1] == "2" { + var r, g, b int + r, _ = strconv.Atoi(token[i+2]) + g, _ = strconv.Atoi(token[i+3]) + b, _ = strconv.Atoi(token[i+4]) + i += 4 + if r > 127 { + attr |= backgroundRed + } + if g > 127 { + attr |= backgroundGreen + } + if b > 127 { + attr |= backgroundBlue + } } else { attr = attr & (w.oldattr & foregroundMask) } @@ -685,38 +767,52 @@ loop: attr |= backgroundBlue } } - procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(attr)) + procSetConsoleTextAttribute.Call(uintptr(handle), uintptr(attr)) } } case 'h': var ci consoleCursorInfo cs := buf.String() if cs == "5>" { - procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) ci.visible = 0 - procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) } else if cs == "?25" { - procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) ci.visible = 1 - procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) + } else if cs == "?1049" { + if w.althandle == 0 { + h, _, _ := procCreateConsoleScreenBuffer.Call(uintptr(genericRead|genericWrite), 0, 0, uintptr(consoleTextmodeBuffer), 0, 0) + w.althandle = syscall.Handle(h) + if w.althandle != 0 { + handle = w.althandle + } + } } case 'l': var ci consoleCursorInfo cs := buf.String() if cs == "5>" { - procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) ci.visible = 1 - procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) } else if cs == "?25" { - procGetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procGetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) ci.visible = 0 - procSetConsoleCursorInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&ci))) + procSetConsoleCursorInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&ci))) + } else if cs == "?1049" { + if w.althandle != 0 { + syscall.CloseHandle(w.althandle) + w.althandle = 0 + handle = w.handle + } } case 's': - procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi))) + procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi))) w.oldpos = csbi.cursorPosition case 'u': - procSetConsoleCursorPosition.Call(uintptr(w.handle), *(*uintptr)(unsafe.Pointer(&w.oldpos))) + procSetConsoleCursorPosition.Call(uintptr(handle), *(*uintptr)(unsafe.Pointer(&w.oldpos))) } } diff --git a/src/dma/vendor/github.com/mattn/go-colorable/go.mod b/src/dma/vendor/github.com/mattn/go-colorable/go.mod new file mode 100644 index 00000000..9d9f4248 --- /dev/null +++ b/src/dma/vendor/github.com/mattn/go-colorable/go.mod @@ -0,0 +1,3 @@ +module github.com/mattn/go-colorable + +require github.com/mattn/go-isatty v0.0.5 diff --git a/src/dma/vendor/github.com/mattn/go-colorable/go.sum b/src/dma/vendor/github.com/mattn/go-colorable/go.sum new file mode 100644 index 00000000..2c12960e --- /dev/null +++ b/src/dma/vendor/github.com/mattn/go-colorable/go.sum @@ -0,0 +1,4 @@ +github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml b/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml index b9f8b239..5597e026 100644 --- a/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml +++ b/src/dma/vendor/github.com/mattn/go-isatty/.travis.yml @@ -2,6 +2,10 @@ language: go go: - tip +os: + - linux + - osx + before_install: - go get github.com/mattn/goveralls - go get golang.org/x/tools/cmd/cover diff --git a/src/dma/vendor/github.com/mattn/go-isatty/go.mod b/src/dma/vendor/github.com/mattn/go-isatty/go.mod new file mode 100644 index 00000000..f310320c --- /dev/null +++ b/src/dma/vendor/github.com/mattn/go-isatty/go.mod @@ -0,0 +1,3 @@ +module github.com/mattn/go-isatty + +require golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 diff --git a/src/dma/vendor/github.com/mattn/go-isatty/go.sum b/src/dma/vendor/github.com/mattn/go-isatty/go.sum new file mode 100644 index 00000000..426c8973 --- /dev/null +++ b/src/dma/vendor/github.com/mattn/go-isatty/go.sum @@ -0,0 +1,2 @@ +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go new file mode 100644 index 00000000..d3567cb5 --- /dev/null +++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_android.go @@ -0,0 +1,23 @@ +// +build android + +package isatty + +import ( + "syscall" + "unsafe" +) + +const ioctlReadTermios = syscall.TCGETS + +// IsTerminal return true if the file descriptor is terminal. +func IsTerminal(fd uintptr) bool { + var termios syscall.Termios + _, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0) + return err == 0 +} + +// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2 +// terminal. This is also always false on this environment. +func IsCygwinTerminal(fd uintptr) bool { + return false +} diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go deleted file mode 100644 index 9584a988..00000000 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_appengine.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build appengine - -package isatty - -// IsTerminal returns true if the file descriptor is terminal which -// is always false on on appengine classic which is a sandboxed PaaS. -func IsTerminal(fd uintptr) bool { - return false -} - -// IsCygwinTerminal() return true if the file descriptor is a cygwin or msys2 -// terminal. This is also always false on this environment. -func IsCygwinTerminal(fd uintptr) bool { - return false -} diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go index 42f2514d..07e93039 100644 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go +++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_bsd.go @@ -16,3 +16,9 @@ func IsTerminal(fd uintptr) bool { _, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0) return err == 0 } + +// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2 +// terminal. This is also always false on this environment. +func IsCygwinTerminal(fd uintptr) bool { + return false +} diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go index 7384cf99..4f8af465 100644 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go +++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux.go @@ -1,18 +1,19 @@ // +build linux -// +build !appengine,!ppc64,!ppc64le +// +build !appengine +// +build !android package isatty -import ( - "syscall" - "unsafe" -) - -const ioctlReadTermios = syscall.TCGETS +import "golang.org/x/sys/unix" // IsTerminal return true if the file descriptor is terminal. func IsTerminal(fd uintptr) bool { - var termios syscall.Termios - _, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0) - return err == 0 + _, err := unix.IoctlGetTermios(int(fd), unix.TCGETS) + return err == nil +} + +// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2 +// terminal. This is also always false on this environment. +func IsCygwinTerminal(fd uintptr) bool { + return false } diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go deleted file mode 100644 index 44e5d213..00000000 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_linux_ppc64x.go +++ /dev/null @@ -1,19 +0,0 @@ -// +build linux -// +build ppc64 ppc64le - -package isatty - -import ( - "unsafe" - - syscall "golang.org/x/sys/unix" -) - -const ioctlReadTermios = syscall.TCGETS - -// IsTerminal return true if the file descriptor is terminal. -func IsTerminal(fd uintptr) bool { - var termios syscall.Termios - _, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0) - return err == 0 -} diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go index ff4de3d9..f02849c5 100644 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go +++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_others.go @@ -1,8 +1,13 @@ -// +build !windows -// +build !appengine +// +build appengine js package isatty +// IsTerminal returns true if the file descriptor is terminal which +// is always false on js and appengine classic which is a sandboxed PaaS. +func IsTerminal(fd uintptr) bool { + return false +} + // IsCygwinTerminal() return true if the file descriptor is a cygwin or msys2 // terminal. This is also always false on this environment. func IsCygwinTerminal(fd uintptr) bool { diff --git a/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go b/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go index 1f0c6bf5..bdd5c79a 100644 --- a/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go +++ b/src/dma/vendor/github.com/mattn/go-isatty/isatty_solaris.go @@ -14,3 +14,9 @@ func IsTerminal(fd uintptr) bool { err := unix.IoctlSetTermio(int(fd), unix.TCGETA, &termio) return err == nil } + +// IsCygwinTerminal return true if the file descriptor is a cygwin or msys2 +// terminal. This is also always false on this environment. +func IsCygwinTerminal(fd uintptr) bool { + return false +} diff --git a/src/dma/vendor/github.com/streadway/amqp/.gitignore b/src/dma/vendor/github.com/streadway/amqp/.gitignore index ba8a7056..667fb50c 100644 --- a/src/dma/vendor/github.com/streadway/amqp/.gitignore +++ b/src/dma/vendor/github.com/streadway/amqp/.gitignore @@ -2,3 +2,11 @@ certs/* spec/spec examples/simple-consumer/simple-consumer examples/simple-producer/simple-producer + +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +.idea/**/contentModel.xml diff --git a/src/dma/vendor/github.com/streadway/amqp/.travis.yml b/src/dma/vendor/github.com/streadway/amqp/.travis.yml index 7166964c..2d22a7af 100644 --- a/src/dma/vendor/github.com/streadway/amqp/.travis.yml +++ b/src/dma/vendor/github.com/streadway/amqp/.travis.yml @@ -1,17 +1,18 @@ language: go go: - - 1.9.x - 1.10.x + - 1.11.x + - 1.12.x services: - rabbitmq env: - - AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ + - GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ before_install: - - go get -v github.com/golang/lint/golint + - go get -v golang.org/x/lint/golint script: - ./pre-commit diff --git a/src/dma/vendor/github.com/streadway/amqp/LICENSE b/src/dma/vendor/github.com/streadway/amqp/LICENSE index 243c0ce7..07b89680 100644 --- a/src/dma/vendor/github.com/streadway/amqp/LICENSE +++ b/src/dma/vendor/github.com/streadway/amqp/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +Copyright (c) 2012-2019, Sean Treadway, SoundCloud Ltd. All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/src/dma/vendor/github.com/streadway/amqp/README.md b/src/dma/vendor/github.com/streadway/amqp/README.md index 099db276..287830b2 100644 --- a/src/dma/vendor/github.com/streadway/amqp/README.md +++ b/src/dma/vendor/github.com/streadway/amqp/README.md @@ -15,7 +15,7 @@ enhancements. ## Supported Go Versions -This library supports two most recent Go release series, currently 1.8 and 1.9. +This library supports two most recent Go release series, currently 1.10 and 1.11. ## Supported RabbitMQ Versions diff --git a/src/dma/vendor/github.com/streadway/amqp/auth.go b/src/dma/vendor/github.com/streadway/amqp/auth.go index ebc765b6..435c94b1 100644 --- a/src/dma/vendor/github.com/streadway/amqp/auth.go +++ b/src/dma/vendor/github.com/streadway/amqp/auth.go @@ -32,6 +32,22 @@ func (auth *PlainAuth) Response() string { return fmt.Sprintf("\000%s\000%s", auth.Username, auth.Password) } +// AMQPlainAuth is similar to PlainAuth +type AMQPlainAuth struct { + Username string + Password string +} + +// Mechanism returns "AMQPLAIN" +func (auth *AMQPlainAuth) Mechanism() string { + return "AMQPLAIN" +} + +// Response returns the null character delimited encoding for the SASL PLAIN Mechanism. +func (auth *AMQPlainAuth) Response() string { + return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password) +} + // Finds the first mechanism preferred by the client that the server supports. func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { for _, auth = range client { diff --git a/src/dma/vendor/github.com/streadway/amqp/channel.go b/src/dma/vendor/github.com/streadway/amqp/channel.go index dd2552ca..3898ed78 100644 --- a/src/dma/vendor/github.com/streadway/amqp/channel.go +++ b/src/dma/vendor/github.com/streadway/amqp/channel.go @@ -889,7 +889,7 @@ and exchanges will also be restored on server restart. If the binding could not complete, an error will be returned and the channel will be closed. -When noWait is true and the queue could not be bound, the channel will be +When noWait is false and the queue could not be bound, the channel will be closed with an error. */ @@ -1580,6 +1580,9 @@ multiple messages, reducing the amount of protocol messages to exchange. See also Delivery.Reject */ func (ch *Channel) Reject(tag uint64, requeue bool) error { + ch.m.Lock() + defer ch.m.Unlock() + return ch.send(&basicReject{ DeliveryTag: tag, Requeue: requeue, diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go index ca1372d0..b9d8e8ee 100644 --- a/src/dma/vendor/github.com/streadway/amqp/connection.go +++ b/src/dma/vendor/github.com/streadway/amqp/connection.go @@ -111,21 +111,23 @@ type readDeadliner interface { SetReadDeadline(time.Time) error } -// defaultDial establishes a connection when config.Dial is not provided -func defaultDial(network, addr string) (net.Conn, error) { - conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout) - if err != nil { - return nil, err - } +// DefaultDial establishes a connection when config.Dial is not provided +func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) { + return func(network, addr string) (net.Conn, error) { + conn, err := net.DialTimeout(network, addr, connectionTimeout) + if err != nil { + return nil, err + } - // Heartbeating hasn't started yet, don't stall forever on a dead server. - // A deadline is set for TLS and AMQP handshaking. After AMQP is established, - // the deadline is cleared in openComplete. - if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil { - return nil, err - } + // Heartbeating hasn't started yet, don't stall forever on a dead server. + // A deadline is set for TLS and AMQP handshaking. After AMQP is established, + // the deadline is cleared in openComplete. + if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil { + return nil, err + } - return conn, nil + return conn, nil + } } // Dial accepts a string in the AMQP URI format and returns a new Connection @@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) { dialer := config.Dial if dialer == nil { - dialer = defaultDial + dialer = DefaultDial(defaultConnectionTimeout) } conn, err = dialer("tcp", addr) @@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) { client := tls.Client(conn, config.TLSClientConfig) if err := client.Handshake(); err != nil { + conn.Close() return nil, err } @@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers will also be closed. */ func (c *Connection) Close() error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -332,7 +335,7 @@ func (c *Connection) Close() error { } func (c *Connection) closeWith(err *Error) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error { ) } -func (c *Connection) isClosed() bool { +// IsClosed returns true if the connection is marked as closed, otherwise false +// is returned. +func (c *Connection) IsClosed() bool { return (atomic.LoadInt32(&c.closed) == 1) } func (c *Connection) send(f frame) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) { c.m.Lock() defer c.m.Unlock() - if c.isClosed() { + if c.IsClosed() { return nil, ErrClosed } diff --git a/src/dma/vendor/github.com/streadway/amqp/delivery.go b/src/dma/vendor/github.com/streadway/amqp/delivery.go index 304c8346..72412644 100644 --- a/src/dma/vendor/github.com/streadway/amqp/delivery.go +++ b/src/dma/vendor/github.com/streadway/amqp/delivery.go @@ -52,7 +52,7 @@ type Delivery struct { DeliveryTag uint64 Redelivered bool - Exchange string // basic.publish exhange + Exchange string // basic.publish exchange RoutingKey string // basic.publish routing key Body []byte diff --git a/src/dma/vendor/github.com/streadway/amqp/go.mod b/src/dma/vendor/github.com/streadway/amqp/go.mod new file mode 100644 index 00000000..4eeab334 --- /dev/null +++ b/src/dma/vendor/github.com/streadway/amqp/go.mod @@ -0,0 +1,3 @@ +module github.com/streadway/amqp + +go 1.10 diff --git a/src/dma/vendor/github.com/streadway/amqp/pre-commit b/src/dma/vendor/github.com/streadway/amqp/pre-commit index 7607f467..37155300 100755 --- a/src/dma/vendor/github.com/streadway/amqp/pre-commit +++ b/src/dma/vendor/github.com/streadway/amqp/pre-commit @@ -1,29 +1,67 @@ #!/bin/sh -GOFMT_FILES=$(gofmt -l .) -if [ -n "${GOFMT_FILES}" ]; then - printf >&2 'gofmt failed for the following files:\n%s\n\nplease run "gofmt -w ." on your changes before committing.\n' "${GOFMT_FILES}" - exit 1 -fi - -GOLINT_ERRORS=$(golint ./... | grep -v "Id should be") -if [ -n "${GOLINT_ERRORS}" ]; then - printf >&2 'golint failed for the following reasons:\n%s\n\nplease run 'golint ./...' on your changes before committing.\n' "${GOLINT_ERRORS}" - exit 1 -fi - -GOVET_ERRORS=$(go tool vet *.go 2>&1) -if [ -n "${GOVET_ERRORS}" ]; then - printf >&2 'go vet failed for the following reasons:\n%s\n\nplease run "go tool vet *.go" on your changes before committing.\n' "${GOVET_ERRORS}" - exit 1 -fi - -if [ -z "${NOTEST}" ]; then - printf >&2 'Running short tests...\n' - env AMQP_URL= go test -short -v | egrep 'PASS|ok' - - if [ $? -ne 0 ]; then - printf >&2 'go test failed, please fix before committing.\n' +LATEST_STABLE_SUPPORTED_GO_VERSION="1.11" + +main() { + if local_go_version_is_latest_stable + then + run_gofmt + run_golint + run_govet + fi + run_unit_tests +} + +local_go_version_is_latest_stable() { + go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION +} + +log_error() { + echo "$*" 1>&2 +} + +run_gofmt() { + GOFMT_FILES=$(gofmt -l .) + if [ -n "$GOFMT_FILES" ] + then + log_error "gofmt failed for the following files: +$GOFMT_FILES + +please run 'gofmt -w .' on your changes before committing." exit 1 fi -fi +} + +run_golint() { + GOLINT_ERRORS=$(golint ./... | grep -v "Id should be") + if [ -n "$GOLINT_ERRORS" ] + then + log_error "golint failed for the following reasons: +$GOLINT_ERRORS + +please run 'golint ./...' on your changes before committing." + exit 1 + fi +} + +run_govet() { + GOVET_ERRORS=$(go tool vet ./*.go 2>&1) + if [ -n "$GOVET_ERRORS" ] + then + log_error "go vet failed for the following reasons: +$GOVET_ERRORS + +please run 'go tool vet ./*.go' on your changes before committing." + exit 1 + fi +} + +run_unit_tests() { + if [ -z "$NOTEST" ] + then + log_error 'Running short tests...' + env AMQP_URL= go test -short + fi +} + +main diff --git a/src/dma/vendor/github.com/streadway/amqp/types.go b/src/dma/vendor/github.com/streadway/amqp/types.go index ff5ea3cb..d3ece707 100644 --- a/src/dma/vendor/github.com/streadway/amqp/types.go +++ b/src/dma/vendor/github.com/streadway/amqp/types.go @@ -200,6 +200,7 @@ type Decimal struct { // byte // float32 // float64 +// int // int16 // int32 // int64 @@ -225,7 +226,7 @@ type Table map[string]interface{} func validateField(f interface{}) error { switch fv := f.(type) { - case nil, bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: + case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: return nil case []interface{}: diff --git a/src/dma/vendor/github.com/streadway/amqp/uri.go b/src/dma/vendor/github.com/streadway/amqp/uri.go index 35fefdc2..e5847154 100644 --- a/src/dma/vendor/github.com/streadway/amqp/uri.go +++ b/src/dma/vendor/github.com/streadway/amqp/uri.go @@ -125,6 +125,15 @@ func (uri URI) PlainAuth() *PlainAuth { } } +// AMQPlainAuth returns a PlainAuth structure based on the parsed URI's +// Username and Password fields. +func (uri URI) AMQPlainAuth() *AMQPlainAuth { + return &AMQPlainAuth{ + Username: uri.Username, + Password: uri.Password, + } +} + func (uri URI) String() string { authority, err := url.Parse("") if err != nil { diff --git a/src/dma/vendor/github.com/streadway/amqp/write.go b/src/dma/vendor/github.com/streadway/amqp/write.go index 58ed20d6..94a46d11 100644 --- a/src/dma/vendor/github.com/streadway/amqp/write.go +++ b/src/dma/vendor/github.com/streadway/amqp/write.go @@ -308,6 +308,11 @@ func writeField(w io.Writer, value interface{}) (err error) { binary.BigEndian.PutUint16(buf[1:3], uint16(v)) enc = buf[:3] + case int: + buf[0] = 'I' + binary.BigEndian.PutUint32(buf[1:5], uint32(v)) + enc = buf[:5] + case int32: buf[0] = 'I' binary.BigEndian.PutUint32(buf[1:5], uint32(v)) diff --git a/src/dma/vendor/github.com/valyala/fasttemplate/go.mod b/src/dma/vendor/github.com/valyala/fasttemplate/go.mod new file mode 100644 index 00000000..6015c4b5 --- /dev/null +++ b/src/dma/vendor/github.com/valyala/fasttemplate/go.mod @@ -0,0 +1,3 @@ +module github.com/valyala/fasttemplate + +require github.com/valyala/bytebufferpool v1.0.0 diff --git a/src/dma/vendor/github.com/valyala/fasttemplate/go.sum b/src/dma/vendor/github.com/valyala/fasttemplate/go.sum new file mode 100644 index 00000000..c10c48c2 --- /dev/null +++ b/src/dma/vendor/github.com/valyala/fasttemplate/go.sum @@ -0,0 +1,2 @@ +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -- cgit 1.2.3-korg