aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/write.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/write.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/write.go416
1 files changed, 416 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/write.go b/src/dma/vendor/github.com/streadway/amqp/write.go
new file mode 100644
index 00000000..94a46d11
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/write.go
@@ -0,0 +1,416 @@
+// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/streadway/amqp
+
+package amqp
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "io"
+ "math"
+ "time"
+)
+
+func (w *writer) WriteFrame(frame frame) (err error) {
+ if err = frame.write(w.w); err != nil {
+ return
+ }
+
+ if buf, ok := w.w.(*bufio.Writer); ok {
+ err = buf.Flush()
+ }
+
+ return
+}
+
+func (f *methodFrame) write(w io.Writer) (err error) {
+ var payload bytes.Buffer
+
+ if f.Method == nil {
+ return errors.New("malformed frame: missing method")
+ }
+
+ class, method := f.Method.id()
+
+ if err = binary.Write(&payload, binary.BigEndian, class); err != nil {
+ return
+ }
+
+ if err = binary.Write(&payload, binary.BigEndian, method); err != nil {
+ return
+ }
+
+ if err = f.Method.write(&payload); err != nil {
+ return
+ }
+
+ return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes())
+}
+
+// Heartbeat
+//
+// Payload is empty
+func (f *heartbeatFrame) write(w io.Writer) (err error) {
+ return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{})
+}
+
+// CONTENT HEADER
+// 0 2 4 12 14
+// +----------+--------+-----------+----------------+------------- - -
+// | class-id | weight | body size | property flags | property list...
+// +----------+--------+-----------+----------------+------------- - -
+// short short long long short remainder...
+//
+func (f *headerFrame) write(w io.Writer) (err error) {
+ var payload bytes.Buffer
+ var zeroTime time.Time
+
+ if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
+ return
+ }
+
+ if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil {
+ return
+ }
+
+ if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil {
+ return
+ }
+
+ // First pass will build the mask to be serialized, second pass will serialize
+ // each of the fields that appear in the mask.
+
+ var mask uint16
+
+ if len(f.Properties.ContentType) > 0 {
+ mask = mask | flagContentType
+ }
+ if len(f.Properties.ContentEncoding) > 0 {
+ mask = mask | flagContentEncoding
+ }
+ if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 {
+ mask = mask | flagHeaders
+ }
+ if f.Properties.DeliveryMode > 0 {
+ mask = mask | flagDeliveryMode
+ }
+ if f.Properties.Priority > 0 {
+ mask = mask | flagPriority
+ }
+ if len(f.Properties.CorrelationId) > 0 {
+ mask = mask | flagCorrelationId
+ }
+ if len(f.Properties.ReplyTo) > 0 {
+ mask = mask | flagReplyTo
+ }
+ if len(f.Properties.Expiration) > 0 {
+ mask = mask | flagExpiration
+ }
+ if len(f.Properties.MessageId) > 0 {
+ mask = mask | flagMessageId
+ }
+ if f.Properties.Timestamp != zeroTime {
+ mask = mask | flagTimestamp
+ }
+ if len(f.Properties.Type) > 0 {
+ mask = mask | flagType
+ }
+ if len(f.Properties.UserId) > 0 {
+ mask = mask | flagUserId
+ }
+ if len(f.Properties.AppId) > 0 {
+ mask = mask | flagAppId
+ }
+
+ if err = binary.Write(&payload, binary.BigEndian, mask); err != nil {
+ return
+ }
+
+ if hasProperty(mask, flagContentType) {
+ if err = writeShortstr(&payload, f.Properties.ContentType); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagContentEncoding) {
+ if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagHeaders) {
+ if err = writeTable(&payload, f.Properties.Headers); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagDeliveryMode) {
+ if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagPriority) {
+ if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagCorrelationId) {
+ if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagReplyTo) {
+ if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagExpiration) {
+ if err = writeShortstr(&payload, f.Properties.Expiration); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagMessageId) {
+ if err = writeShortstr(&payload, f.Properties.MessageId); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagTimestamp) {
+ if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagType) {
+ if err = writeShortstr(&payload, f.Properties.Type); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagUserId) {
+ if err = writeShortstr(&payload, f.Properties.UserId); err != nil {
+ return
+ }
+ }
+ if hasProperty(mask, flagAppId) {
+ if err = writeShortstr(&payload, f.Properties.AppId); err != nil {
+ return
+ }
+ }
+
+ return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes())
+}
+
+// Body
+//
+// Payload is one byterange from the full body who's size is declared in the
+// Header frame
+func (f *bodyFrame) write(w io.Writer) (err error) {
+ return writeFrame(w, frameBody, f.ChannelId, f.Body)
+}
+
+func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) {
+ end := []byte{frameEnd}
+ size := uint(len(payload))
+
+ _, err = w.Write([]byte{
+ byte(typ),
+ byte((channel & 0xff00) >> 8),
+ byte((channel & 0x00ff) >> 0),
+ byte((size & 0xff000000) >> 24),
+ byte((size & 0x00ff0000) >> 16),
+ byte((size & 0x0000ff00) >> 8),
+ byte((size & 0x000000ff) >> 0),
+ })
+
+ if err != nil {
+ return
+ }
+
+ if _, err = w.Write(payload); err != nil {
+ return
+ }
+
+ if _, err = w.Write(end); err != nil {
+ return
+ }
+
+ return
+}
+
+func writeShortstr(w io.Writer, s string) (err error) {
+ b := []byte(s)
+
+ var length = uint8(len(b))
+
+ if err = binary.Write(w, binary.BigEndian, length); err != nil {
+ return
+ }
+
+ if _, err = w.Write(b[:length]); err != nil {
+ return
+ }
+
+ return
+}
+
+func writeLongstr(w io.Writer, s string) (err error) {
+ b := []byte(s)
+
+ var length = uint32(len(b))
+
+ if err = binary.Write(w, binary.BigEndian, length); err != nil {
+ return
+ }
+
+ if _, err = w.Write(b[:length]); err != nil {
+ return
+ }
+
+ return
+}
+
+/*
+'A': []interface{}
+'D': Decimal
+'F': Table
+'I': int32
+'S': string
+'T': time.Time
+'V': nil
+'b': byte
+'d': float64
+'f': float32
+'l': int64
+'s': int16
+'t': bool
+'x': []byte
+*/
+func writeField(w io.Writer, value interface{}) (err error) {
+ var buf [9]byte
+ var enc []byte
+
+ switch v := value.(type) {
+ case bool:
+ buf[0] = 't'
+ if v {
+ buf[1] = byte(1)
+ } else {
+ buf[1] = byte(0)
+ }
+ enc = buf[:2]
+
+ case byte:
+ buf[0] = 'b'
+ buf[1] = byte(v)
+ enc = buf[:2]
+
+ case int16:
+ buf[0] = 's'
+ binary.BigEndian.PutUint16(buf[1:3], uint16(v))
+ enc = buf[:3]
+
+ case int:
+ buf[0] = 'I'
+ binary.BigEndian.PutUint32(buf[1:5], uint32(v))
+ enc = buf[:5]
+
+ case int32:
+ buf[0] = 'I'
+ binary.BigEndian.PutUint32(buf[1:5], uint32(v))
+ enc = buf[:5]
+
+ case int64:
+ buf[0] = 'l'
+ binary.BigEndian.PutUint64(buf[1:9], uint64(v))
+ enc = buf[:9]
+
+ case float32:
+ buf[0] = 'f'
+ binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v))
+ enc = buf[:5]
+
+ case float64:
+ buf[0] = 'd'
+ binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
+ enc = buf[:9]
+
+ case Decimal:
+ buf[0] = 'D'
+ buf[1] = byte(v.Scale)
+ binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value))
+ enc = buf[:6]
+
+ case string:
+ buf[0] = 'S'
+ binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
+ enc = append(buf[:5], []byte(v)...)
+
+ case []interface{}: // field-array
+ buf[0] = 'A'
+
+ sec := new(bytes.Buffer)
+ for _, val := range v {
+ if err = writeField(sec, val); err != nil {
+ return
+ }
+ }
+
+ binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
+ if _, err = w.Write(buf[:5]); err != nil {
+ return
+ }
+
+ if _, err = w.Write(sec.Bytes()); err != nil {
+ return
+ }
+
+ return
+
+ case time.Time:
+ buf[0] = 'T'
+ binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix()))
+ enc = buf[:9]
+
+ case Table:
+ if _, err = w.Write([]byte{'F'}); err != nil {
+ return
+ }
+ return writeTable(w, v)
+
+ case []byte:
+ buf[0] = 'x'
+ binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
+ if _, err = w.Write(buf[0:5]); err != nil {
+ return
+ }
+ if _, err = w.Write(v); err != nil {
+ return
+ }
+ return
+
+ case nil:
+ buf[0] = 'V'
+ enc = buf[:1]
+
+ default:
+ return ErrFieldType
+ }
+
+ _, err = w.Write(enc)
+
+ return
+}
+
+func writeTable(w io.Writer, table Table) (err error) {
+ var buf bytes.Buffer
+
+ for key, val := range table {
+ if err = writeShortstr(&buf, key); err != nil {
+ return
+ }
+ if err = writeField(&buf, val); err != nil {
+ return
+ }
+ }
+
+ return writeLongstr(w, string(buf.Bytes()))
+}