aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/go-redis/redis/command.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/command.go')
-rw-r--r--src/dma/vendor/github.com/go-redis/redis/command.go993
1 files changed, 852 insertions, 141 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go
index 11472bec..cb4f94b1 100644
--- a/src/dma/vendor/github.com/go-redis/redis/command.go
+++ b/src/dma/vendor/github.com/go-redis/redis/command.go
@@ -1,16 +1,14 @@
package redis
import (
- "bytes"
"fmt"
+ "net"
"strconv"
"strings"
"time"
"github.com/go-redis/redis/internal"
- "github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
- "github.com/go-redis/redis/internal/util"
)
type Cmder interface {
@@ -18,13 +16,12 @@ type Cmder interface {
Args() []interface{}
stringArg(int) string
- readReply(*pool.Conn) error
+ readReply(rd *proto.Reader) error
setErr(error)
readTimeout() *time.Duration
Err() error
- fmt.Stringer
}
func setCmdsErr(cmds []Cmder, e error) {
@@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) {
}
}
-func firstCmdsErr(cmds []Cmder) error {
+func cmdsFirstErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
@@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error {
return nil
}
-func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
- cn.Wb.Reset()
+func writeCmd(wr *proto.Writer, cmds ...Cmder) error {
for _, cmd := range cmds {
- if err := cn.Wb.Append(cmd.Args()); err != nil {
+ err := wr.WriteArgs(cmd.Args())
+ if err != nil {
return err
}
}
-
- _, err := cn.Write(cn.Wb.Bytes())
- return err
+ return nil
}
func cmdString(cmd Cmder, val interface{}) string {
@@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) {
return cmd.val, cmd.err
}
-func (cmd *Cmd) String() string {
- return cmdString(cmd, cmd.val)
+func (cmd *Cmd) String() (string, error) {
+ if cmd.err != nil {
+ return "", cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case string:
+ return val, nil
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for String", val)
+ return "", err
+ }
}
-func (cmd *Cmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
+func (cmd *Cmd) Int() (int, error) {
if cmd.err != nil {
- return cmd.err
+ return 0, cmd.err
}
- if b, ok := cmd.val.([]byte); ok {
- // Bytes must be copied, because underlying memory is reused.
- cmd.val = string(b)
+ switch val := cmd.val.(type) {
+ case int64:
+ return int(val), nil
+ case string:
+ return strconv.Atoi(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int", val)
+ return 0, err
}
- return nil
+}
+
+func (cmd *Cmd) Int64() (int64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val, nil
+ case string:
+ return strconv.ParseInt(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Uint64() (uint64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return uint64(val), nil
+ case string:
+ return strconv.ParseUint(val, 10, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Uint64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Float64() (float64, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return float64(val), nil
+ case string:
+ return strconv.ParseFloat(val, 64)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Float64", val)
+ return 0, err
+ }
+}
+
+func (cmd *Cmd) Bool() (bool, error) {
+ if cmd.err != nil {
+ return false, cmd.err
+ }
+ switch val := cmd.val.(type) {
+ case int64:
+ return val != 0, nil
+ case string:
+ return strconv.ParseBool(val)
+ default:
+ err := fmt.Errorf("redis: unexpected type=%T for Bool", val)
+ return false, err
+ }
+}
+
+func (cmd *Cmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadReply(sliceParser)
+ return cmd.err
+}
+
+// Implements proto.MultiBulkParse
+func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ vals := make([]interface{}, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(sliceParser)
+ if err != nil {
+ if err == Nil {
+ vals = append(vals, nil)
+ continue
+ }
+ if err, ok := err.(proto.RedisError); ok {
+ vals = append(vals, err)
+ continue
+ }
+ return nil, err
+ }
+
+ switch v := v.(type) {
+ case string:
+ vals = append(vals, v)
+ default:
+ vals = append(vals, v)
+ }
+ }
+ return vals, nil
}
//------------------------------------------------------------------------------
@@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
+ v, cmd.err = rd.ReadArrayReply(sliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadStringReply()
+func (cmd *StatusCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *IntCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadIntReply()
+func (cmd *IntCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadIntReply()
return cmd.err
}
@@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
+func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
var n int64
- n, cmd.err = cn.Rd.ReadIntReply()
+ n, cmd.err = rd.ReadIntReply()
if cmd.err != nil {
return cmd.err
}
@@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
+func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
+ v, cmd.err = rd.ReadArrayReply(timeParser)
if cmd.err != nil {
return cmd.err
}
@@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d elements, expected 2", n)
+ }
+
+ sec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ microsec, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ return time.Unix(sec, microsec*1000), nil
+}
+
//------------------------------------------------------------------------------
type BoolCmd struct {
@@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string {
return cmdString(cmd, cmd.val)
}
-var ok = []byte("OK")
-
-func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadReply(nil)
+ v, cmd.err = rd.ReadReply(nil)
// `SET key value NX` returns nil when key already exists. But
// `SETNX key value` returns bool (0/1). So convert nil to bool.
// TODO: is this okay?
@@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
case int64:
cmd.val = v == 1
return nil
- case []byte:
- cmd.val = bytes.Equal(v, ok)
+ case string:
+ cmd.val = v == "OK"
return nil
default:
cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
@@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
type StringCmd struct {
baseCmd
- val []byte
+ val string
}
var _ Cmder = (*StringCmd)(nil)
@@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd {
}
func (cmd *StringCmd) Val() string {
- return util.BytesToString(cmd.val)
+ return cmd.val
}
func (cmd *StringCmd) Result() (string, error) {
@@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) {
}
func (cmd *StringCmd) Bytes() ([]byte, error) {
- return cmd.val, cmd.err
+ return []byte(cmd.val), cmd.err
+}
+
+func (cmd *StringCmd) Int() (int, error) {
+ if cmd.err != nil {
+ return 0, cmd.err
+ }
+ return strconv.Atoi(cmd.Val())
}
func (cmd *StringCmd) Int64() (int64, error) {
@@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error {
if cmd.err != nil {
return cmd.err
}
- return proto.Scan(cmd.val, val)
+ return proto.Scan([]byte(cmd.val), val)
}
func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadBytesReply()
+func (cmd *StringCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadString()
return cmd.err
}
@@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
- cmd.val, cmd.err = cn.Rd.ReadFloatReply()
+func (cmd *FloatCmd) readReply(rd *proto.Reader) error {
+ cmd.val, cmd.err = rd.ReadFloatReply()
return cmd.err
}
@@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
return proto.ScanSlice(cmd.Val(), container)
}
-func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
+ v, cmd.err = rd.ReadArrayReply(stringSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ss := make([]string, 0, n)
+ for i := int64(0); i < n; i++ {
+ s, err := rd.ReadString()
+ if err == Nil {
+ ss = append(ss, "")
+ } else if err != nil {
+ return nil, err
+ } else {
+ ss = append(ss, s)
+ }
+ }
+ return ss, nil
+}
+
//------------------------------------------------------------------------------
type BoolSliceCmd struct {
@@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
+ v, cmd.err = rd.ReadArrayReply(boolSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ bools := make([]bool, 0, n)
+ for i := int64(0); i < n; i++ {
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ bools = append(bools, n == 1)
+ }
+ return bools, nil
+}
+
//------------------------------------------------------------------------------
type StringStringMapCmd struct {
@@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStringMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]string, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringIntMapCmd struct {
@@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringIntMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]int64, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ n, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = n
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
type StringStructMapCmd struct {
@@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser)
+ v, cmd.err = rd.ReadArrayReply(stringStructMapParser)
if cmd.err != nil {
return cmd.err
}
@@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
return nil
}
-//------------------------------------------------------------------------------
+// Implements proto.MultiBulkParse
+func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]struct{}, n)
+ for i := int64(0); i < n; i++ {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
-type XStream struct {
- Stream string
- Messages []*XMessage
+ m[key] = struct{}{}
+ }
+ return m, nil
}
+//------------------------------------------------------------------------------
+
type XMessage struct {
ID string
Values map[string]interface{}
}
+type XMessageSliceCmd struct {
+ baseCmd
+
+ val []XMessage
+}
+
+var _ Cmder = (*XMessageSliceCmd)(nil)
+
+func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
+ return &XMessageSliceCmd{
+ baseCmd: baseCmd{_args: args},
+ }
+}
+
+func (cmd *XMessageSliceCmd) Val() []XMessage {
+ return cmd.val
+}
+
+func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XMessageSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]XMessage)
+ return nil
+}
+
+// Implements proto.MultiBulkParse
+func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ msgs := make([]XMessage, 0, n)
+ for i := int64(0); i < n; i++ {
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(stringInterfaceMapParser)
+ if err != nil {
+ return nil, err
+ }
+
+ msgs = append(msgs, XMessage{
+ ID: id,
+ Values: v.(map[string]interface{}),
+ })
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return msgs, nil
+}
+
+// Implements proto.MultiBulkParse
+func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]interface{}, n/2)
+ for i := int64(0); i < n; i += 2 {
+ key, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = value
+ }
+ return m, nil
+}
+
//------------------------------------------------------------------------------
+type XStream struct {
+ Stream string
+ Messages []XMessage
+}
+
type XStreamSliceCmd struct {
baseCmd
- val []*XStream
+ val []XStream
}
var _ Cmder = (*XStreamSliceCmd)(nil)
@@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
}
}
-func (cmd *XStreamSliceCmd) Val() []*XStream {
+func (cmd *XStreamSliceCmd) Val() []XStream {
return cmd.val
}
-func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
+func (cmd *XStreamSliceCmd) Result() ([]XStream, error) {
return cmd.val, cmd.err
}
@@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
+ v, cmd.err = rd.ReadArrayReply(xStreamSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XStream)
+ cmd.val = v.([]XStream)
return nil
}
// Implements proto.MultiBulkParse
func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- xx := make([]*XStream, n)
+ ret := make([]XStream, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xStreamParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ stream, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ v, err := rd.ReadArrayReply(xMessageSliceParser)
+ if err != nil {
+ return nil, err
+ }
+
+ ret = append(ret, XStream{
+ Stream: stream,
+ Messages: v.([]XMessage),
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- xx[i] = v.(*XStream)
}
- return xx, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
- if n != 2 {
- return nil, fmt.Errorf("got %d, wanted 2", n)
+//------------------------------------------------------------------------------
+
+type XPending struct {
+ Count int64
+ Lower string
+ Higher string
+ Consumers map[string]int64
+}
+
+type XPendingCmd struct {
+ baseCmd
+ val *XPending
+}
+
+var _ Cmder = (*XPendingCmd)(nil)
+
+func NewXPendingCmd(args ...interface{}) *XPendingCmd {
+ return &XPendingCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *XPendingCmd) Val() *XPending {
+ return cmd.val
+}
+
+func (cmd *XPendingCmd) Result() (*XPending, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *XPendingCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = info.(*XPending)
+ return nil
+}
- stream, err := rd.ReadStringReply()
+func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ count, err := rd.ReadIntReply()
if err != nil {
return nil, err
}
- v, err := rd.ReadArrayReply(xMessageSliceParser)
- if err != nil {
+ lower, err := rd.ReadString()
+ if err != nil && err != Nil {
return nil, err
}
- return &XStream{
- Stream: stream,
- Messages: v.([]*XMessage),
- }, nil
+ higher, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ pending := &XPending{
+ Count: count,
+ Lower: lower,
+ Higher: higher,
+ }
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ for i := int64(0); i < n; i++ {
+ _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 2 {
+ return nil, fmt.Errorf("got %d, wanted 2", n)
+ }
+
+ consumerName, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumerPending, err := rd.ReadInt()
+ if err != nil {
+ return nil, err
+ }
+
+ if pending.Consumers == nil {
+ pending.Consumers = make(map[string]int64)
+ }
+ pending.Consumers[consumerName] = consumerPending
+
+ return nil, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+ return nil, nil
+ })
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ return pending, nil
}
//------------------------------------------------------------------------------
-type XMessageSliceCmd struct {
- baseCmd
+type XPendingExt struct {
+ Id string
+ Consumer string
+ Idle time.Duration
+ RetryCount int64
+}
- val []*XMessage
+type XPendingExtCmd struct {
+ baseCmd
+ val []XPendingExt
}
-var _ Cmder = (*XMessageSliceCmd)(nil)
+var _ Cmder = (*XPendingExtCmd)(nil)
-func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
- return &XMessageSliceCmd{
+func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd {
+ return &XPendingExtCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *XMessageSliceCmd) Val() []*XMessage {
+func (cmd *XPendingExtCmd) Val() []XPendingExt {
return cmd.val
}
-func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
+func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) {
return cmd.val, cmd.err
}
-func (cmd *XMessageSliceCmd) String() string {
+func (cmd *XPendingExtCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
- var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
+func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
+ var info interface{}
+ info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]*XMessage)
+ cmd.val = info.([]XPendingExt)
return nil
}
-// Implements proto.MultiBulkParse
-func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
- msgs := make([]*XMessage, n)
+func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ ret := make([]XPendingExt, 0, n)
for i := int64(0); i < n; i++ {
- v, err := rd.ReadArrayReply(xMessageParser)
+ _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 4 {
+ return nil, fmt.Errorf("got %d, wanted 4", n)
+ }
+
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ consumer, err := rd.ReadString()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ idle, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ retryCount, err := rd.ReadIntReply()
+ if err != nil && err != Nil {
+ return nil, err
+ }
+
+ ret = append(ret, XPendingExt{
+ Id: id,
+ Consumer: consumer,
+ Idle: time.Duration(idle) * time.Millisecond,
+ RetryCount: retryCount,
+ })
+ return nil, nil
+ })
if err != nil {
return nil, err
}
- msgs[i] = v.(*XMessage)
}
- return msgs, nil
+ return ret, nil
}
-// Implements proto.MultiBulkParse
-func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
- id, err := rd.ReadStringReply()
- if err != nil {
- return nil, err
- }
+//------------------------------------------------------------------------------
- v, err := rd.ReadArrayReply(xKeyValueParser)
- if err != nil {
- return nil, err
+//------------------------------------------------------------------------------
+
+type ZSliceCmd struct {
+ baseCmd
+
+ val []Z
+}
+
+var _ Cmder = (*ZSliceCmd)(nil)
+
+func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
+ return &ZSliceCmd{
+ baseCmd: baseCmd{_args: args},
}
+}
+
+func (cmd *ZSliceCmd) Val() []Z {
+ return cmd.val
+}
- return &XMessage{
- ID: id,
- Values: v.(map[string]interface{}),
- }, nil
+func (cmd *ZSliceCmd) Result() ([]Z, error) {
+ return cmd.val, cmd.err
+}
+
+func (cmd *ZSliceCmd) String() string {
+ return cmdString(cmd, cmd.val)
+}
+
+func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
+ var v interface{}
+ v, cmd.err = rd.ReadArrayReply(zSliceParser)
+ if cmd.err != nil {
+ return cmd.err
+ }
+ cmd.val = v.([]Z)
+ return nil
}
// Implements proto.MultiBulkParse
-func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
- values := make(map[string]interface{}, n)
+func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 {
- key, err := rd.ReadStringReply()
+ var err error
+
+ z := &zz[i/2]
+
+ z.Member, err = rd.ReadString()
if err != nil {
return nil, err
}
- value, err := rd.ReadStringReply()
+ z.Score, err = rd.ReadFloatReply()
if err != nil {
return nil, err
}
-
- values[key] = value
}
- return values, nil
+ return zz, nil
}
//------------------------------------------------------------------------------
-type ZSliceCmd struct {
+type ZWithKeyCmd struct {
baseCmd
- val []Z
+ val ZWithKey
}
-var _ Cmder = (*ZSliceCmd)(nil)
+var _ Cmder = (*ZWithKeyCmd)(nil)
-func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
- return &ZSliceCmd{
+func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
+ return &ZWithKeyCmd{
baseCmd: baseCmd{_args: args},
}
}
-func (cmd *ZSliceCmd) Val() []Z {
+func (cmd *ZWithKeyCmd) Val() ZWithKey {
return cmd.val
}
-func (cmd *ZSliceCmd) Result() ([]Z, error) {
- return cmd.val, cmd.err
+func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
+ return cmd.Val(), cmd.Err()
}
-func (cmd *ZSliceCmd) String() string {
+func (cmd *ZWithKeyCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
+ v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
if cmd.err != nil {
return cmd.err
}
- cmd.val = v.([]Z)
+ cmd.val = v.(ZWithKey)
return nil
}
+// Implements proto.MultiBulkParse
+func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
+ if n != 3 {
+ return nil, fmt.Errorf("got %d elements, expected 3", n)
+ }
+
+ var z ZWithKey
+ var err error
+
+ z.Key, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Member, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ z.Score, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ return z, nil
+}
+
//------------------------------------------------------------------------------
type ScanCmd struct {
@@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.page)
}
-func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
- cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
+func (cmd *ScanCmd) readReply(rd *proto.Reader) error {
+ cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
return cmd.err
}
@@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
+func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
+ v, cmd.err = rd.ReadArrayReply(clusterSlotsParser)
if cmd.err != nil {
return cmd.err
}
@@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
+ slots := make([]ClusterSlot, n)
+ for i := 0; i < len(slots); i++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n < 2 {
+ err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
+ return nil, err
+ }
+
+ start, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ end, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes := make([]ClusterNode, n-2)
+ for j := 0; j < len(nodes); j++ {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 && n != 3 {
+ err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
+ return nil, err
+ }
+
+ ip, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ port, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes[j].Addr = net.JoinHostPort(ip, port)
+
+ if n == 3 {
+ id, err := rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ nodes[j].Id = id
+ }
+ }
+
+ slots[i] = ClusterSlot{
+ Start: int(start),
+ End: int(end),
+ Nodes: nodes,
+ }
+ }
+ return slots, nil
+}
+
//------------------------------------------------------------------------------
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
-func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
+ v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
if cmd.err != nil {
return cmd.err
}
@@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
return nil
}
+func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ var loc GeoLocation
+ var err error
+
+ loc.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+ if q.WithDist {
+ loc.Dist, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithGeoHash {
+ loc.GeoHash, err = rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+ if q.WithCoord {
+ n, err := rd.ReadArrayLen()
+ if err != nil {
+ return nil, err
+ }
+ if n != 2 {
+ return nil, fmt.Errorf("got %d coordinates, expected 2", n)
+ }
+
+ loc.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ loc.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &loc, nil
+ }
+}
+
+func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+ return func(rd *proto.Reader, n int64) (interface{}, error) {
+ locs := make([]GeoLocation, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(newGeoLocationParser(q))
+ if err != nil {
+ return nil, err
+ }
+ switch vv := v.(type) {
+ case string:
+ locs = append(locs, GeoLocation{
+ Name: vv,
+ })
+ case *GeoLocation:
+ locs = append(locs, *vv)
+ default:
+ return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
+ }
+ }
+ return locs, nil
+ }
+}
+
//------------------------------------------------------------------------------
type GeoPos struct {
@@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string {
return cmdString(cmd, cmd.positions)
}
-func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser)
+ v, cmd.err = rd.ReadArrayReply(geoPosSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
return nil
}
+func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ positions := make([]*GeoPos, 0, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(geoPosParser)
+ if err != nil {
+ if err == Nil {
+ positions = append(positions, nil)
+ continue
+ }
+ return nil, err
+ }
+ switch v := v.(type) {
+ case *GeoPos:
+ positions = append(positions, v)
+ default:
+ return nil, fmt.Errorf("got %T, expected *GeoPos", v)
+ }
+ }
+ return positions, nil
+}
+
+func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var pos GeoPos
+ var err error
+
+ pos.Longitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ pos.Latitude, err = rd.ReadFloatReply()
+ if err != nil {
+ return nil, err
+ }
+
+ return &pos, nil
+}
+
//------------------------------------------------------------------------------
type CommandInfo struct {
@@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string {
return cmdString(cmd, cmd.val)
}
-func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
+func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
var v interface{}
- v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser)
+ v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser)
if cmd.err != nil {
return cmd.err
}
@@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
return nil
}
+// Implements proto.MultiBulkParse
+func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+ m := make(map[string]*CommandInfo, n)
+ for i := int64(0); i < n; i++ {
+ v, err := rd.ReadReply(commandInfoParser)
+ if err != nil {
+ return nil, err
+ }
+ vv := v.(*CommandInfo)
+ m[vv.Name] = vv
+
+ }
+ return m, nil
+}
+
+func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
+ var cmd CommandInfo
+ var err error
+
+ if n != 6 {
+ return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
+ }
+
+ cmd.Name, err = rd.ReadString()
+ if err != nil {
+ return nil, err
+ }
+
+ arity, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.Arity = int8(arity)
+
+ flags, err := rd.ReadReply(stringSliceParser)
+ if err != nil {
+ return nil, err
+ }
+ cmd.Flags = flags.([]string)
+
+ firstKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.FirstKeyPos = int8(firstKeyPos)
+
+ lastKeyPos, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.LastKeyPos = int8(lastKeyPos)
+
+ stepCount, err := rd.ReadIntReply()
+ if err != nil {
+ return nil, err
+ }
+ cmd.StepCount = int8(stepCount)
+
+ for _, flag := range cmd.Flags {
+ if flag == "readonly" {
+ cmd.ReadOnly = true
+ break
+ }
+ }
+
+ return &cmd, nil
+}
+
//------------------------------------------------------------------------------
type cmdsInfoCache struct {