aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/delivery.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/delivery.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/delivery.go173
1 files changed, 173 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/delivery.go b/src/dma/vendor/github.com/streadway/amqp/delivery.go
new file mode 100644
index 00000000..72412644
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/delivery.go
@@ -0,0 +1,173 @@
+// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/streadway/amqp
+
+package amqp
+
+import (
+ "errors"
+ "time"
+)
+
+var errDeliveryNotInitialized = errors.New("delivery not initialized")
+
+// Acknowledger notifies the server of successful or failed consumption of
+// delivieries via identifier found in the Delivery.DeliveryTag field.
+//
+// Applications can provide mock implementations in tests of Delivery handlers.
+type Acknowledger interface {
+ Ack(tag uint64, multiple bool) error
+ Nack(tag uint64, multiple bool, requeue bool) error
+ Reject(tag uint64, requeue bool) error
+}
+
+// Delivery captures the fields for a previously delivered message resident in
+// a queue to be delivered by the server to a consumer from Channel.Consume or
+// Channel.Get.
+type Delivery struct {
+ Acknowledger Acknowledger // the channel from which this delivery arrived
+
+ Headers Table // Application or header exchange table
+
+ // Properties
+ ContentType string // MIME content type
+ ContentEncoding string // MIME content encoding
+ DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
+ Priority uint8 // queue implementation use - 0 to 9
+ CorrelationId string // application use - correlation identifier
+ ReplyTo string // application use - address to reply to (ex: RPC)
+ Expiration string // implementation use - message expiration spec
+ MessageId string // application use - message identifier
+ Timestamp time.Time // application use - message timestamp
+ Type string // application use - message type name
+ UserId string // application use - creating user - should be authenticated user
+ AppId string // application use - creating application id
+
+ // Valid only with Channel.Consume
+ ConsumerTag string
+
+ // Valid only with Channel.Get
+ MessageCount uint32
+
+ DeliveryTag uint64
+ Redelivered bool
+ Exchange string // basic.publish exchange
+ RoutingKey string // basic.publish routing key
+
+ Body []byte
+}
+
+func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
+ props, body := msg.getContent()
+
+ delivery := Delivery{
+ Acknowledger: channel,
+
+ Headers: props.Headers,
+ ContentType: props.ContentType,
+ ContentEncoding: props.ContentEncoding,
+ DeliveryMode: props.DeliveryMode,
+ Priority: props.Priority,
+ CorrelationId: props.CorrelationId,
+ ReplyTo: props.ReplyTo,
+ Expiration: props.Expiration,
+ MessageId: props.MessageId,
+ Timestamp: props.Timestamp,
+ Type: props.Type,
+ UserId: props.UserId,
+ AppId: props.AppId,
+
+ Body: body,
+ }
+
+ // Properties for the delivery types
+ switch m := msg.(type) {
+ case *basicDeliver:
+ delivery.ConsumerTag = m.ConsumerTag
+ delivery.DeliveryTag = m.DeliveryTag
+ delivery.Redelivered = m.Redelivered
+ delivery.Exchange = m.Exchange
+ delivery.RoutingKey = m.RoutingKey
+
+ case *basicGetOk:
+ delivery.MessageCount = m.MessageCount
+ delivery.DeliveryTag = m.DeliveryTag
+ delivery.Redelivered = m.Redelivered
+ delivery.Exchange = m.Exchange
+ delivery.RoutingKey = m.RoutingKey
+ }
+
+ return &delivery
+}
+
+/*
+Ack delegates an acknowledgement through the Acknowledger interface that the
+client or server has finished work on a delivery.
+
+All deliveries in AMQP must be acknowledged. If you called Channel.Consume
+with autoAck true then the server will be automatically ack each message and
+this method should not be called. Otherwise, you must call Delivery.Ack after
+you have successfully processed this delivery.
+
+When multiple is true, this delivery and all prior unacknowledged deliveries
+on the same channel will be acknowledged. This is useful for batch processing
+of deliveries.
+
+An error will indicate that the acknowledge could not be delivered to the
+channel it was sent from.
+
+Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
+delivery that is not automatically acknowledged.
+*/
+func (d Delivery) Ack(multiple bool) error {
+ if d.Acknowledger == nil {
+ return errDeliveryNotInitialized
+ }
+ return d.Acknowledger.Ack(d.DeliveryTag, multiple)
+}
+
+/*
+Reject delegates a negatively acknowledgement through the Acknowledger interface.
+
+When requeue is true, queue this message to be delivered to a consumer on a
+different channel. When requeue is false or the server is unable to queue this
+message, it will be dropped.
+
+If you are batch processing deliveries, and your server supports it, prefer
+Delivery.Nack.
+
+Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
+delivery that is not automatically acknowledged.
+*/
+func (d Delivery) Reject(requeue bool) error {
+ if d.Acknowledger == nil {
+ return errDeliveryNotInitialized
+ }
+ return d.Acknowledger.Reject(d.DeliveryTag, requeue)
+}
+
+/*
+Nack negatively acknowledge the delivery of message(s) identified by the
+delivery tag from either the client or server.
+
+When multiple is true, nack messages up to and including delivered messages up
+until the delivery tag delivered on the same channel.
+
+When requeue is true, request the server to deliver this message to a different
+consumer. If it is not possible or requeue is false, the message will be
+dropped or delivered to a server configured dead-letter queue.
+
+This method must not be used to select or requeue messages the client wishes
+not to handle, rather it is to inform the server that the client is incapable
+of handling this message at this time.
+
+Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
+delivery that is not automatically acknowledged.
+*/
+func (d Delivery) Nack(multiple, requeue bool) error {
+ if d.Acknowledger == nil {
+ return errDeliveryNotInitialized
+ }
+ return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
+}