diff options
author | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-06 09:04:29 +0000 |
---|---|---|
committer | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-07 06:03:01 +0000 |
commit | d61931341176dad9ccff7c967a10d88fe54218fa (patch) | |
tree | 526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/vendor/github.com/streadway/amqp/delivery.go | |
parent | 73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff) |
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa
Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/delivery.go')
-rw-r--r-- | src/dma/vendor/github.com/streadway/amqp/delivery.go | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/delivery.go b/src/dma/vendor/github.com/streadway/amqp/delivery.go new file mode 100644 index 00000000..304c8346 --- /dev/null +++ b/src/dma/vendor/github.com/streadway/amqp/delivery.go @@ -0,0 +1,173 @@ +// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// Source code and contact info at http://github.com/streadway/amqp + +package amqp + +import ( + "errors" + "time" +) + +var errDeliveryNotInitialized = errors.New("delivery not initialized") + +// Acknowledger notifies the server of successful or failed consumption of +// delivieries via identifier found in the Delivery.DeliveryTag field. +// +// Applications can provide mock implementations in tests of Delivery handlers. +type Acknowledger interface { + Ack(tag uint64, multiple bool) error + Nack(tag uint64, multiple bool, requeue bool) error + Reject(tag uint64, requeue bool) error +} + +// Delivery captures the fields for a previously delivered message resident in +// a queue to be delivered by the server to a consumer from Channel.Consume or +// Channel.Get. +type Delivery struct { + Acknowledger Acknowledger // the channel from which this delivery arrived + + Headers Table // Application or header exchange table + + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user - should be authenticated user + AppId string // application use - creating application id + + // Valid only with Channel.Consume + ConsumerTag string + + // Valid only with Channel.Get + MessageCount uint32 + + DeliveryTag uint64 + Redelivered bool + Exchange string // basic.publish exhange + RoutingKey string // basic.publish routing key + + Body []byte +} + +func newDelivery(channel *Channel, msg messageWithContent) *Delivery { + props, body := msg.getContent() + + delivery := Delivery{ + Acknowledger: channel, + + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, + + Body: body, + } + + // Properties for the delivery types + switch m := msg.(type) { + case *basicDeliver: + delivery.ConsumerTag = m.ConsumerTag + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + + case *basicGetOk: + delivery.MessageCount = m.MessageCount + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + } + + return &delivery +} + +/* +Ack delegates an acknowledgement through the Acknowledger interface that the +client or server has finished work on a delivery. + +All deliveries in AMQP must be acknowledged. If you called Channel.Consume +with autoAck true then the server will be automatically ack each message and +this method should not be called. Otherwise, you must call Delivery.Ack after +you have successfully processed this delivery. + +When multiple is true, this delivery and all prior unacknowledged deliveries +on the same channel will be acknowledged. This is useful for batch processing +of deliveries. + +An error will indicate that the acknowledge could not be delivered to the +channel it was sent from. + +Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every +delivery that is not automatically acknowledged. +*/ +func (d Delivery) Ack(multiple bool) error { + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Ack(d.DeliveryTag, multiple) +} + +/* +Reject delegates a negatively acknowledgement through the Acknowledger interface. + +When requeue is true, queue this message to be delivered to a consumer on a +different channel. When requeue is false or the server is unable to queue this +message, it will be dropped. + +If you are batch processing deliveries, and your server supports it, prefer +Delivery.Nack. + +Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every +delivery that is not automatically acknowledged. +*/ +func (d Delivery) Reject(requeue bool) error { + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Reject(d.DeliveryTag, requeue) +} + +/* +Nack negatively acknowledge the delivery of message(s) identified by the +delivery tag from either the client or server. + +When multiple is true, nack messages up to and including delivered messages up +until the delivery tag delivered on the same channel. + +When requeue is true, request the server to deliver this message to a different +consumer. If it is not possible or requeue is false, the message will be +dropped or delivered to a server configured dead-letter queue. + +This method must not be used to select or requeue messages the client wishes +not to handle, rather it is to inform the server that the client is incapable +of handling this message at this time. + +Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every +delivery that is not automatically acknowledged. +*/ +func (d Delivery) Nack(multiple, requeue bool) error { + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) +} |