aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/types.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/types.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/types.go428
1 files changed, 428 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/types.go b/src/dma/vendor/github.com/streadway/amqp/types.go
new file mode 100644
index 00000000..d3ece707
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/types.go
@@ -0,0 +1,428 @@
+// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/streadway/amqp
+
+package amqp
+
+import (
+ "fmt"
+ "io"
+ "time"
+)
+
+// Constants for standard AMQP 0-9-1 exchange types.
+const (
+ ExchangeDirect = "direct"
+ ExchangeFanout = "fanout"
+ ExchangeTopic = "topic"
+ ExchangeHeaders = "headers"
+)
+
+var (
+ // ErrClosed is returned when the channel or connection is not open
+ ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
+
+ // ErrChannelMax is returned when Connection.Channel has been called enough
+ // times that all channel IDs have been exhausted in the client or the
+ // server.
+ ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
+
+ // ErrSASL is returned from Dial when the authentication mechanism could not
+ // be negoated.
+ ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
+
+ // ErrCredentials is returned when the authenticated client is not authorized
+ // to any vhost.
+ ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
+
+ // ErrVhost is returned when the authenticated user is not permitted to
+ // access the requested Vhost.
+ ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
+
+ // ErrSyntax is hard protocol error, indicating an unsupported protocol,
+ // implementation or encoding.
+ ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
+
+ // ErrFrame is returned when the protocol frame cannot be read from the
+ // server, indicating an unsupported protocol or unsupported frame type.
+ ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
+
+ // ErrCommandInvalid is returned when the server sends an unexpected response
+ // to this requested message type. This indicates a bug in this client.
+ ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
+
+ // ErrUnexpectedFrame is returned when something other than a method or
+ // heartbeat frame is delivered to the Connection, indicating a bug in the
+ // client.
+ ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
+
+ // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
+ ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
+)
+
+// Error captures the code and reason a channel or connection has been closed
+// by the server.
+type Error struct {
+ Code int // constant code from the specification
+ Reason string // description of the error
+ Server bool // true when initiated from the server, false when from this library
+ Recover bool // true when this error can be recovered by retrying later or with different parameters
+}
+
+func newError(code uint16, text string) *Error {
+ return &Error{
+ Code: int(code),
+ Reason: text,
+ Recover: isSoftExceptionCode(int(code)),
+ Server: true,
+ }
+}
+
+func (e Error) Error() string {
+ return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
+}
+
+// Used by header frames to capture routing and header information
+type properties struct {
+ ContentType string // MIME content type
+ ContentEncoding string // MIME content encoding
+ Headers Table // Application or header exchange table
+ DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2)
+ Priority uint8 // queue implementation use - 0 to 9
+ CorrelationId string // application use - correlation identifier
+ ReplyTo string // application use - address to to reply to (ex: RPC)
+ Expiration string // implementation use - message expiration spec
+ MessageId string // application use - message identifier
+ Timestamp time.Time // application use - message timestamp
+ Type string // application use - message type name
+ UserId string // application use - creating user id
+ AppId string // application use - creating application
+ reserved1 string // was cluster-id - process for buffer consumption
+}
+
+// DeliveryMode. Transient means higher throughput but messages will not be
+// restored on broker restart. The delivery mode of publishings is unrelated
+// to the durability of the queues they reside on. Transient messages will
+// not be restored to durable queues, persistent messages will be restored to
+// durable queues and lost on non-durable queues during server restart.
+//
+// This remains typed as uint8 to match Publishing.DeliveryMode. Other
+// delivery modes specific to custom queue implementations are not enumerated
+// here.
+const (
+ Transient uint8 = 1
+ Persistent uint8 = 2
+)
+
+// The property flags are an array of bits that indicate the presence or
+// absence of each property value in sequence. The bits are ordered from most
+// high to low - bit 15 indicates the first property.
+const (
+ flagContentType = 0x8000
+ flagContentEncoding = 0x4000
+ flagHeaders = 0x2000
+ flagDeliveryMode = 0x1000
+ flagPriority = 0x0800
+ flagCorrelationId = 0x0400
+ flagReplyTo = 0x0200
+ flagExpiration = 0x0100
+ flagMessageId = 0x0080
+ flagTimestamp = 0x0040
+ flagType = 0x0020
+ flagUserId = 0x0010
+ flagAppId = 0x0008
+ flagReserved1 = 0x0004
+)
+
+// Queue captures the current server state of the queue on the server returned
+// from Channel.QueueDeclare or Channel.QueueInspect.
+type Queue struct {
+ Name string // server confirmed or generated name
+ Messages int // count of messages not awaiting acknowledgment
+ Consumers int // number of consumers receiving deliveries
+}
+
+// Publishing captures the client message sent to the server. The fields
+// outside of the Headers table included in this struct mirror the underlying
+// fields in the content frame. They use native types for convenience and
+// efficiency.
+type Publishing struct {
+ // Application or exchange specific fields,
+ // the headers exchange will inspect this field.
+ Headers Table
+
+ // Properties
+ ContentType string // MIME content type
+ ContentEncoding string // MIME content encoding
+ DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
+ Priority uint8 // 0 to 9
+ CorrelationId string // correlation identifier
+ ReplyTo string // address to to reply to (ex: RPC)
+ Expiration string // message expiration spec
+ MessageId string // message identifier
+ Timestamp time.Time // message timestamp
+ Type string // message type name
+ UserId string // creating user id - ex: "guest"
+ AppId string // creating application id
+
+ // The application specific payload of the message
+ Body []byte
+}
+
+// Blocking notifies the server's TCP flow control of the Connection. When a
+// server hits a memory or disk alarm it will block all connections until the
+// resources are reclaimed. Use NotifyBlock on the Connection to receive these
+// events.
+type Blocking struct {
+ Active bool // TCP pushback active/inactive on server
+ Reason string // Server reason for activation
+}
+
+// Confirmation notifies the acknowledgment or negative acknowledgement of a
+// publishing identified by its delivery tag. Use NotifyPublish on the Channel
+// to consume these events.
+type Confirmation struct {
+ DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
+ Ack bool // True when the server successfully received the publishing
+}
+
+// Decimal matches the AMQP decimal type. Scale is the number of decimal
+// digits Scale == 2, Value == 12345, Decimal == 123.45
+type Decimal struct {
+ Scale uint8
+ Value int32
+}
+
+// Table stores user supplied fields of the following types:
+//
+// bool
+// byte
+// float32
+// float64
+// int
+// int16
+// int32
+// int64
+// nil
+// string
+// time.Time
+// amqp.Decimal
+// amqp.Table
+// []byte
+// []interface{} - containing above types
+//
+// Functions taking a table will immediately fail when the table contains a
+// value of an unsupported type.
+//
+// The caller must be specific in which precision of integer it wishes to
+// encode.
+//
+// Use a type assertion when reading values from a table for type conversion.
+//
+// RabbitMQ expects int32 for integer values.
+//
+type Table map[string]interface{}
+
+func validateField(f interface{}) error {
+ switch fv := f.(type) {
+ case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
+ return nil
+
+ case []interface{}:
+ for _, v := range fv {
+ if err := validateField(v); err != nil {
+ return fmt.Errorf("in array %s", err)
+ }
+ }
+ return nil
+
+ case Table:
+ for k, v := range fv {
+ if err := validateField(v); err != nil {
+ return fmt.Errorf("table field %q %s", k, err)
+ }
+ }
+ return nil
+ }
+
+ return fmt.Errorf("value %t not supported", f)
+}
+
+// Validate returns and error if any Go types in the table are incompatible with AMQP types.
+func (t Table) Validate() error {
+ return validateField(t)
+}
+
+// Heap interface for maintaining delivery tags
+type tagSet []uint64
+
+func (set tagSet) Len() int { return len(set) }
+func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] }
+func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] }
+func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
+func (set *tagSet) Pop() interface{} {
+ val := (*set)[len(*set)-1]
+ *set = (*set)[:len(*set)-1]
+ return val
+}
+
+type message interface {
+ id() (uint16, uint16)
+ wait() bool
+ read(io.Reader) error
+ write(io.Writer) error
+}
+
+type messageWithContent interface {
+ message
+ getContent() (properties, []byte)
+ setContent(properties, []byte)
+}
+
+/*
+The base interface implemented as:
+
+2.3.5 frame Details
+
+All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
+malformed frames:
+
+ 0 1 3 7 size+7 size+8
+ +------+---------+-------------+ +------------+ +-----------+
+ | type | channel | size | | payload | | frame-end |
+ +------+---------+-------------+ +------------+ +-----------+
+ octet short long size octets octet
+
+To read a frame, we:
+
+ 1. Read the header and check the frame type and channel.
+ 2. Depending on the frame type, we read the payload and process it.
+ 3. Read the frame end octet.
+
+In realistic implementations where performance is a concern, we would use
+“read-ahead buffering” or “gathering reads” to avoid doing three separate
+system calls to read a frame.
+
+*/
+type frame interface {
+ write(io.Writer) error
+ channel() uint16
+}
+
+type reader struct {
+ r io.Reader
+}
+
+type writer struct {
+ w io.Writer
+}
+
+// Implements the frame interface for Connection RPC
+type protocolHeader struct{}
+
+func (protocolHeader) write(w io.Writer) error {
+ _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
+ return err
+}
+
+func (protocolHeader) channel() uint16 {
+ panic("only valid as initial handshake")
+}
+
+/*
+Method frames carry the high-level protocol commands (which we call "methods").
+One method frame carries one command. The method frame payload has this format:
+
+ 0 2 4
+ +----------+-----------+-------------- - -
+ | class-id | method-id | arguments...
+ +----------+-----------+-------------- - -
+ short short ...
+
+To process a method frame, we:
+ 1. Read the method frame payload.
+ 2. Unpack it into a structure. A given method always has the same structure,
+ so we can unpack the method rapidly. 3. Check that the method is allowed in
+ the current context.
+ 4. Check that the method arguments are valid.
+ 5. Execute the method.
+
+Method frame bodies are constructed as a list of AMQP data fields (bits,
+integers, strings and string tables). The marshalling code is trivially
+generated directly from the protocol specifications, and can be very rapid.
+*/
+type methodFrame struct {
+ ChannelId uint16
+ ClassId uint16
+ MethodId uint16
+ Method message
+}
+
+func (f *methodFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Heartbeating is a technique designed to undo one of TCP/IP's features, namely
+its ability to recover from a broken physical connection by closing only after
+a quite long time-out. In some scenarios we need to know very rapidly if a
+peer is disconnected or not responding for other reasons (e.g. it is looping).
+Since heartbeating can be done at a low level, we implement this as a special
+type of frame that peers exchange at the transport level, rather than as a
+class method.
+*/
+type heartbeatFrame struct {
+ ChannelId uint16
+}
+
+func (f *heartbeatFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
+defined as carrying content. When a peer sends such a method frame, it always
+follows it with a content header and zero or more content body frames.
+
+A content header frame has this format:
+
+ 0 2 4 12 14
+ +----------+--------+-----------+----------------+------------- - -
+ | class-id | weight | body size | property flags | property list...
+ +----------+--------+-----------+----------------+------------- - -
+ short short long long short remainder...
+
+We place content body in distinct frames (rather than including it in the
+method) so that AMQP may support "zero copy" techniques in which content is
+never marshalled or encoded. We place the content properties in their own
+frame so that recipients can selectively discard contents they do not want to
+process
+*/
+type headerFrame struct {
+ ChannelId uint16
+ ClassId uint16
+ weight uint16
+ Size uint64
+ Properties properties
+}
+
+func (f *headerFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Content is the application data we carry from client-to-client via the AMQP
+server. Content is, roughly speaking, a set of properties plus a binary data
+part. The set of allowed properties are defined by the Basic class, and these
+form the "content header frame". The data can be any size, and MAY be broken
+into several (or many) chunks, each forming a "content body frame".
+
+Looking at the frames for a specific channel, as they pass on the wire, we
+might see something like this:
+
+ [method]
+ [method] [header] [body] [body]
+ [method]
+ ...
+*/
+type bodyFrame struct {
+ ChannelId uint16
+ Body []byte
+}
+
+func (f *bodyFrame) channel() uint16 { return f.ChannelId }