diff options
Diffstat (limited to 'src/dma/vendor/github.com/go-redis/redis/command.go')
-rw-r--r-- | src/dma/vendor/github.com/go-redis/redis/command.go | 993 |
1 files changed, 852 insertions, 141 deletions
diff --git a/src/dma/vendor/github.com/go-redis/redis/command.go b/src/dma/vendor/github.com/go-redis/redis/command.go index 11472bec..cb4f94b1 100644 --- a/src/dma/vendor/github.com/go-redis/redis/command.go +++ b/src/dma/vendor/github.com/go-redis/redis/command.go @@ -1,16 +1,14 @@ package redis import ( - "bytes" "fmt" + "net" "strconv" "strings" "time" "github.com/go-redis/redis/internal" - "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" - "github.com/go-redis/redis/internal/util" ) type Cmder interface { @@ -18,13 +16,12 @@ type Cmder interface { Args() []interface{} stringArg(int) string - readReply(*pool.Conn) error + readReply(rd *proto.Reader) error setErr(error) readTimeout() *time.Duration Err() error - fmt.Stringer } func setCmdsErr(cmds []Cmder, e error) { @@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) { } } -func firstCmdsErr(cmds []Cmder) error { +func cmdsFirstErr(cmds []Cmder) error { for _, cmd := range cmds { if err := cmd.Err(); err != nil { return err @@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error { return nil } -func writeCmd(cn *pool.Conn, cmds ...Cmder) error { - cn.Wb.Reset() +func writeCmd(wr *proto.Writer, cmds ...Cmder) error { for _, cmd := range cmds { - if err := cn.Wb.Append(cmd.Args()); err != nil { + err := wr.WriteArgs(cmd.Args()) + if err != nil { return err } } - - _, err := cn.Write(cn.Wb.Bytes()) - return err + return nil } func cmdString(cmd Cmder, val interface{}) string { @@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) { return cmd.val, cmd.err } -func (cmd *Cmd) String() string { - return cmdString(cmd, cmd.val) +func (cmd *Cmd) String() (string, error) { + if cmd.err != nil { + return "", cmd.err + } + switch val := cmd.val.(type) { + case string: + return val, nil + default: + err := fmt.Errorf("redis: unexpected type=%T for String", val) + return "", err + } } -func (cmd *Cmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser) +func (cmd *Cmd) Int() (int, error) { if cmd.err != nil { - return cmd.err + return 0, cmd.err } - if b, ok := cmd.val.([]byte); ok { - // Bytes must be copied, because underlying memory is reused. - cmd.val = string(b) + switch val := cmd.val.(type) { + case int64: + return int(val), nil + case string: + return strconv.Atoi(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int", val) + return 0, err } - return nil +} + +func (cmd *Cmd) Int64() (int64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val, nil + case string: + return strconv.ParseInt(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Int64", val) + return 0, err + } +} + +func (cmd *Cmd) Uint64() (uint64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return uint64(val), nil + case string: + return strconv.ParseUint(val, 10, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Uint64", val) + return 0, err + } +} + +func (cmd *Cmd) Float64() (float64, error) { + if cmd.err != nil { + return 0, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return float64(val), nil + case string: + return strconv.ParseFloat(val, 64) + default: + err := fmt.Errorf("redis: unexpected type=%T for Float64", val) + return 0, err + } +} + +func (cmd *Cmd) Bool() (bool, error) { + if cmd.err != nil { + return false, cmd.err + } + switch val := cmd.val.(type) { + case int64: + return val != 0, nil + case string: + return strconv.ParseBool(val) + default: + err := fmt.Errorf("redis: unexpected type=%T for Bool", val) + return false, err + } +} + +func (cmd *Cmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadReply(sliceParser) + return cmd.err +} + +// Implements proto.MultiBulkParse +func sliceParser(rd *proto.Reader, n int64) (interface{}, error) { + vals := make([]interface{}, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(sliceParser) + if err != nil { + if err == Nil { + vals = append(vals, nil) + continue + } + if err, ok := err.(proto.RedisError); ok { + vals = append(vals, err) + continue + } + return nil, err + } + + switch v := v.(type) { + case string: + vals = append(vals, v) + default: + vals = append(vals, v) + } + } + return vals, nil } //------------------------------------------------------------------------------ @@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *SliceCmd) readReply(cn *pool.Conn) error { +func (cmd *SliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(sliceParser) + v, cmd.err = rd.ReadArrayReply(sliceParser) if cmd.err != nil { return cmd.err } @@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StatusCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadStringReply() +func (cmd *StatusCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *IntCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadIntReply() +func (cmd *IntCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadIntReply() return cmd.err } @@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *DurationCmd) readReply(cn *pool.Conn) error { +func (cmd *DurationCmd) readReply(rd *proto.Reader) error { var n int64 - n, cmd.err = cn.Rd.ReadIntReply() + n, cmd.err = rd.ReadIntReply() if cmd.err != nil { return cmd.err } @@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *TimeCmd) readReply(cn *pool.Conn) error { +func (cmd *TimeCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(timeParser) + v, cmd.err = rd.ReadArrayReply(timeParser) if cmd.err != nil { return cmd.err } @@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func timeParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d elements, expected 2", n) + } + + sec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + microsec, err := rd.ReadInt() + if err != nil { + return nil, err + } + + return time.Unix(sec, microsec*1000), nil +} + //------------------------------------------------------------------------------ type BoolCmd struct { @@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string { return cmdString(cmd, cmd.val) } -var ok = []byte("OK") - -func (cmd *BoolCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadReply(nil) + v, cmd.err = rd.ReadReply(nil) // `SET key value NX` returns nil when key already exists. But // `SETNX key value` returns bool (0/1). So convert nil to bool. // TODO: is this okay? @@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { case int64: cmd.val = v == 1 return nil - case []byte: - cmd.val = bytes.Equal(v, ok) + case string: + cmd.val = v == "OK" return nil default: cmd.err = fmt.Errorf("got %T, wanted int64 or string", v) @@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error { type StringCmd struct { baseCmd - val []byte + val string } var _ Cmder = (*StringCmd)(nil) @@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd { } func (cmd *StringCmd) Val() string { - return util.BytesToString(cmd.val) + return cmd.val } func (cmd *StringCmd) Result() (string, error) { @@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) { } func (cmd *StringCmd) Bytes() ([]byte, error) { - return cmd.val, cmd.err + return []byte(cmd.val), cmd.err +} + +func (cmd *StringCmd) Int() (int, error) { + if cmd.err != nil { + return 0, cmd.err + } + return strconv.Atoi(cmd.Val()) } func (cmd *StringCmd) Int64() (int64, error) { @@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error { if cmd.err != nil { return cmd.err } - return proto.Scan(cmd.val, val) + return proto.Scan([]byte(cmd.val), val) } func (cmd *StringCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadBytesReply() +func (cmd *StringCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadString() return cmd.err } @@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *FloatCmd) readReply(cn *pool.Conn) error { - cmd.val, cmd.err = cn.Rd.ReadFloatReply() +func (cmd *FloatCmd) readReply(rd *proto.Reader) error { + cmd.val, cmd.err = rd.ReadFloatReply() return cmd.err } @@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error { return proto.ScanSlice(cmd.Val(), container) } -func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser) + v, cmd.err = rd.ReadArrayReply(stringSliceParser) if cmd.err != nil { return cmd.err } @@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ss := make([]string, 0, n) + for i := int64(0); i < n; i++ { + s, err := rd.ReadString() + if err == Nil { + ss = append(ss, "") + } else if err != nil { + return nil, err + } else { + ss = append(ss, s) + } + } + return ss, nil +} + //------------------------------------------------------------------------------ type BoolSliceCmd struct { @@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser) + v, cmd.err = rd.ReadArrayReply(boolSliceParser) if cmd.err != nil { return cmd.err } @@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + bools := make([]bool, 0, n) + for i := int64(0); i < n; i++ { + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + bools = append(bools, n == 1) + } + return bools, nil +} + //------------------------------------------------------------------------------ type StringStringMapCmd struct { @@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser) + v, cmd.err = rd.ReadArrayReply(stringStringMapParser) if cmd.err != nil { return cmd.err } @@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]string, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ type StringIntMapCmd struct { @@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser) + v, cmd.err = rd.ReadArrayReply(stringIntMapParser) if cmd.err != nil { return cmd.err } @@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]int64, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + n, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + m[key] = n + } + return m, nil +} + //------------------------------------------------------------------------------ type StringStructMapCmd struct { @@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { +func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser) + v, cmd.err = rd.ReadArrayReply(stringStructMapParser) if cmd.err != nil { return cmd.err } @@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error { return nil } -//------------------------------------------------------------------------------ +// Implements proto.MultiBulkParse +func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]struct{}, n) + for i := int64(0); i < n; i++ { + key, err := rd.ReadString() + if err != nil { + return nil, err + } -type XStream struct { - Stream string - Messages []*XMessage + m[key] = struct{}{} + } + return m, nil } +//------------------------------------------------------------------------------ + type XMessage struct { ID string Values map[string]interface{} } +type XMessageSliceCmd struct { + baseCmd + + val []XMessage +} + +var _ Cmder = (*XMessageSliceCmd)(nil) + +func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { + return &XMessageSliceCmd{ + baseCmd: baseCmd{_args: args}, + } +} + +func (cmd *XMessageSliceCmd) Val() []XMessage { + return cmd.val +} + +func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) { + return cmd.val, cmd.err +} + +func (cmd *XMessageSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(xMessageSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]XMessage) + return nil +} + +// Implements proto.MultiBulkParse +func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + msgs := make([]XMessage, 0, n) + for i := int64(0); i < n; i++ { + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(stringInterfaceMapParser) + if err != nil { + return nil, err + } + + msgs = append(msgs, XMessage{ + ID: id, + Values: v.(map[string]interface{}), + }) + return nil, nil + }) + if err != nil { + return nil, err + } + } + return msgs, nil +} + +// Implements proto.MultiBulkParse +func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]interface{}, n/2) + for i := int64(0); i < n; i += 2 { + key, err := rd.ReadString() + if err != nil { + return nil, err + } + + value, err := rd.ReadString() + if err != nil { + return nil, err + } + + m[key] = value + } + return m, nil +} + //------------------------------------------------------------------------------ +type XStream struct { + Stream string + Messages []XMessage +} + type XStreamSliceCmd struct { baseCmd - val []*XStream + val []XStream } var _ Cmder = (*XStreamSliceCmd)(nil) @@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd { } } -func (cmd *XStreamSliceCmd) Val() []*XStream { +func (cmd *XStreamSliceCmd) Val() []XStream { return cmd.val } -func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) { +func (cmd *XStreamSliceCmd) Result() ([]XStream, error) { return cmd.val, cmd.err } @@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser) + v, cmd.err = rd.ReadArrayReply(xStreamSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XStream) + cmd.val = v.([]XStream) return nil } // Implements proto.MultiBulkParse func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - xx := make([]*XStream, n) + ret := make([]XStream, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xStreamParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + stream, err := rd.ReadString() + if err != nil { + return nil, err + } + + v, err := rd.ReadArrayReply(xMessageSliceParser) + if err != nil { + return nil, err + } + + ret = append(ret, XStream{ + Stream: stream, + Messages: v.([]XMessage), + }) + return nil, nil + }) if err != nil { return nil, err } - xx[i] = v.(*XStream) } - return xx, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) { - if n != 2 { - return nil, fmt.Errorf("got %d, wanted 2", n) +//------------------------------------------------------------------------------ + +type XPending struct { + Count int64 + Lower string + Higher string + Consumers map[string]int64 +} + +type XPendingCmd struct { + baseCmd + val *XPending +} + +var _ Cmder = (*XPendingCmd)(nil) + +func NewXPendingCmd(args ...interface{}) *XPendingCmd { + return &XPendingCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *XPendingCmd) Val() *XPending { + return cmd.val +} + +func (cmd *XPendingCmd) Result() (*XPending, error) { + return cmd.val, cmd.err +} + +func (cmd *XPendingCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *XPendingCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = info.(*XPending) + return nil +} - stream, err := rd.ReadStringReply() +func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + count, err := rd.ReadIntReply() if err != nil { return nil, err } - v, err := rd.ReadArrayReply(xMessageSliceParser) - if err != nil { + lower, err := rd.ReadString() + if err != nil && err != Nil { return nil, err } - return &XStream{ - Stream: stream, - Messages: v.([]*XMessage), - }, nil + higher, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + pending := &XPending{ + Count: count, + Lower: lower, + Higher: higher, + } + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + for i := int64(0); i < n; i++ { + _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 2 { + return nil, fmt.Errorf("got %d, wanted 2", n) + } + + consumerName, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumerPending, err := rd.ReadInt() + if err != nil { + return nil, err + } + + if pending.Consumers == nil { + pending.Consumers = make(map[string]int64) + } + pending.Consumers[consumerName] = consumerPending + + return nil, nil + }) + if err != nil { + return nil, err + } + } + return nil, nil + }) + if err != nil && err != Nil { + return nil, err + } + + return pending, nil } //------------------------------------------------------------------------------ -type XMessageSliceCmd struct { - baseCmd +type XPendingExt struct { + Id string + Consumer string + Idle time.Duration + RetryCount int64 +} - val []*XMessage +type XPendingExtCmd struct { + baseCmd + val []XPendingExt } -var _ Cmder = (*XMessageSliceCmd)(nil) +var _ Cmder = (*XPendingExtCmd)(nil) -func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd { - return &XMessageSliceCmd{ +func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd { + return &XPendingExtCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *XMessageSliceCmd) Val() []*XMessage { +func (cmd *XPendingExtCmd) Val() []XPendingExt { return cmd.val } -func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) { +func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) { return cmd.val, cmd.err } -func (cmd *XMessageSliceCmd) String() string { +func (cmd *XPendingExtCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error { - var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser) +func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error { + var info interface{} + info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]*XMessage) + cmd.val = info.([]XPendingExt) return nil } -// Implements proto.MultiBulkParse -func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) { - msgs := make([]*XMessage, n) +func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + ret := make([]XPendingExt, 0, n) for i := int64(0); i < n; i++ { - v, err := rd.ReadArrayReply(xMessageParser) + _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) { + if n != 4 { + return nil, fmt.Errorf("got %d, wanted 4", n) + } + + id, err := rd.ReadString() + if err != nil { + return nil, err + } + + consumer, err := rd.ReadString() + if err != nil && err != Nil { + return nil, err + } + + idle, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + retryCount, err := rd.ReadIntReply() + if err != nil && err != Nil { + return nil, err + } + + ret = append(ret, XPendingExt{ + Id: id, + Consumer: consumer, + Idle: time.Duration(idle) * time.Millisecond, + RetryCount: retryCount, + }) + return nil, nil + }) if err != nil { return nil, err } - msgs[i] = v.(*XMessage) } - return msgs, nil + return ret, nil } -// Implements proto.MultiBulkParse -func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) { - id, err := rd.ReadStringReply() - if err != nil { - return nil, err - } +//------------------------------------------------------------------------------ - v, err := rd.ReadArrayReply(xKeyValueParser) - if err != nil { - return nil, err +//------------------------------------------------------------------------------ + +type ZSliceCmd struct { + baseCmd + + val []Z +} + +var _ Cmder = (*ZSliceCmd)(nil) + +func NewZSliceCmd(args ...interface{}) *ZSliceCmd { + return &ZSliceCmd{ + baseCmd: baseCmd{_args: args}, } +} + +func (cmd *ZSliceCmd) Val() []Z { + return cmd.val +} - return &XMessage{ - ID: id, - Values: v.(map[string]interface{}), - }, nil +func (cmd *ZSliceCmd) Result() ([]Z, error) { + return cmd.val, cmd.err +} + +func (cmd *ZSliceCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error { + var v interface{} + v, cmd.err = rd.ReadArrayReply(zSliceParser) + if cmd.err != nil { + return cmd.err + } + cmd.val = v.([]Z) + return nil } // Implements proto.MultiBulkParse -func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) { - values := make(map[string]interface{}, n) +func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + zz := make([]Z, n/2) for i := int64(0); i < n; i += 2 { - key, err := rd.ReadStringReply() + var err error + + z := &zz[i/2] + + z.Member, err = rd.ReadString() if err != nil { return nil, err } - value, err := rd.ReadStringReply() + z.Score, err = rd.ReadFloatReply() if err != nil { return nil, err } - - values[key] = value } - return values, nil + return zz, nil } //------------------------------------------------------------------------------ -type ZSliceCmd struct { +type ZWithKeyCmd struct { baseCmd - val []Z + val ZWithKey } -var _ Cmder = (*ZSliceCmd)(nil) +var _ Cmder = (*ZWithKeyCmd)(nil) -func NewZSliceCmd(args ...interface{}) *ZSliceCmd { - return &ZSliceCmd{ +func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd { + return &ZWithKeyCmd{ baseCmd: baseCmd{_args: args}, } } -func (cmd *ZSliceCmd) Val() []Z { +func (cmd *ZWithKeyCmd) Val() ZWithKey { return cmd.val } -func (cmd *ZSliceCmd) Result() ([]Z, error) { - return cmd.val, cmd.err +func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) { + return cmd.Val(), cmd.Err() } -func (cmd *ZSliceCmd) String() string { +func (cmd *ZWithKeyCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error { +func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser) + v, cmd.err = rd.ReadArrayReply(zWithKeyParser) if cmd.err != nil { return cmd.err } - cmd.val = v.([]Z) + cmd.val = v.(ZWithKey) return nil } +// Implements proto.MultiBulkParse +func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) { + if n != 3 { + return nil, fmt.Errorf("got %d elements, expected 3", n) + } + + var z ZWithKey + var err error + + z.Key, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Member, err = rd.ReadString() + if err != nil { + return nil, err + } + z.Score, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + return z, nil +} + //------------------------------------------------------------------------------ type ScanCmd struct { @@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string { return cmdString(cmd, cmd.page) } -func (cmd *ScanCmd) readReply(cn *pool.Conn) error { - cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply() +func (cmd *ScanCmd) readReply(rd *proto.Reader) error { + cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply() return cmd.err } @@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { +func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser) + v, cmd.err = rd.ReadArrayReply(clusterSlotsParser) if cmd.err != nil { return cmd.err } @@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) { + slots := make([]ClusterSlot, n) + for i := 0; i < len(slots); i++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n < 2 { + err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n) + return nil, err + } + + start, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + end, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + + nodes := make([]ClusterNode, n-2) + for j := 0; j < len(nodes); j++ { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 && n != 3 { + err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n) + return nil, err + } + + ip, err := rd.ReadString() + if err != nil { + return nil, err + } + + port, err := rd.ReadString() + if err != nil { + return nil, err + } + + nodes[j].Addr = net.JoinHostPort(ip, port) + + if n == 3 { + id, err := rd.ReadString() + if err != nil { + return nil, err + } + nodes[j].Id = id + } + } + + slots[i] = ClusterSlot{ + Start: int(start), + End: int(end), + Nodes: nodes, + } + } + return slots, nil +} + //------------------------------------------------------------------------------ // GeoLocation is used with GeoAdd to add geospatial location. @@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string { return cmdString(cmd, cmd.locations) } -func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) + v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q)) if cmd.err != nil { return cmd.err } @@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error { return nil } +func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + var loc GeoLocation + var err error + + loc.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + if q.WithDist { + loc.Dist, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + if q.WithGeoHash { + loc.GeoHash, err = rd.ReadIntReply() + if err != nil { + return nil, err + } + } + if q.WithCoord { + n, err := rd.ReadArrayLen() + if err != nil { + return nil, err + } + if n != 2 { + return nil, fmt.Errorf("got %d coordinates, expected 2", n) + } + + loc.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + loc.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + } + + return &loc, nil + } +} + +func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse { + return func(rd *proto.Reader, n int64) (interface{}, error) { + locs := make([]GeoLocation, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(newGeoLocationParser(q)) + if err != nil { + return nil, err + } + switch vv := v.(type) { + case string: + locs = append(locs, GeoLocation{ + Name: vv, + }) + case *GeoLocation: + locs = append(locs, *vv) + default: + return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v) + } + } + return locs, nil + } +} + //------------------------------------------------------------------------------ type GeoPos struct { @@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string { return cmdString(cmd, cmd.positions) } -func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { +func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser) + v, cmd.err = rd.ReadArrayReply(geoPosSliceParser) if cmd.err != nil { return cmd.err } @@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error { return nil } +func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + positions := make([]*GeoPos, 0, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(geoPosParser) + if err != nil { + if err == Nil { + positions = append(positions, nil) + continue + } + return nil, err + } + switch v := v.(type) { + case *GeoPos: + positions = append(positions, v) + default: + return nil, fmt.Errorf("got %T, expected *GeoPos", v) + } + } + return positions, nil +} + +func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) { + var pos GeoPos + var err error + + pos.Longitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + pos.Latitude, err = rd.ReadFloatReply() + if err != nil { + return nil, err + } + + return &pos, nil +} + //------------------------------------------------------------------------------ type CommandInfo struct { @@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string { return cmdString(cmd, cmd.val) } -func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { +func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error { var v interface{} - v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser) + v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser) if cmd.err != nil { return cmd.err } @@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error { return nil } +// Implements proto.MultiBulkParse +func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) { + m := make(map[string]*CommandInfo, n) + for i := int64(0); i < n; i++ { + v, err := rd.ReadReply(commandInfoParser) + if err != nil { + return nil, err + } + vv := v.(*CommandInfo) + m[vv.Name] = vv + + } + return m, nil +} + +func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) { + var cmd CommandInfo + var err error + + if n != 6 { + return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n) + } + + cmd.Name, err = rd.ReadString() + if err != nil { + return nil, err + } + + arity, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.Arity = int8(arity) + + flags, err := rd.ReadReply(stringSliceParser) + if err != nil { + return nil, err + } + cmd.Flags = flags.([]string) + + firstKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.FirstKeyPos = int8(firstKeyPos) + + lastKeyPos, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.LastKeyPos = int8(lastKeyPos) + + stepCount, err := rd.ReadIntReply() + if err != nil { + return nil, err + } + cmd.StepCount = int8(stepCount) + + for _, flag := range cmd.Flags { + if flag == "readonly" { + cmd.ReadOnly = true + break + } + } + + return &cmd, nil +} + //------------------------------------------------------------------------------ type cmdsInfoCache struct { |