aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/delivery.go
blob: 7241264423148578b89fcbd5efce44b0cc173263 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Source code and contact info at http://github.com/streadway/amqp

package amqp

import (
	"errors"
	"time"
)

var errDeliveryNotInitialized = errors.New("delivery not initialized")

// Acknowledger notifies the server of successful or failed consumption of
// delivieries via identifier found in the Delivery.DeliveryTag field.
//
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
	Ack(tag uint64, multiple bool) error
	Nack(tag uint64, multiple bool, requeue bool) error
	Reject(tag uint64, requeue bool) error
}

// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
	Acknowledger Acknowledger // the channel from which this delivery arrived

	Headers Table // Application or header exchange table

	// Properties
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // queue implementation use - non-persistent (1) or persistent (2)
	Priority        uint8     // queue implementation use - 0 to 9
	CorrelationId   string    // application use - correlation identifier
	ReplyTo         string    // application use - address to reply to (ex: RPC)
	Expiration      string    // implementation use - message expiration spec
	MessageId       string    // application use - message identifier
	Timestamp       time.Time // application use - message timestamp
	Type            string    // application use - message type name
	UserId          string    // application use - creating user - should be authenticated user
	AppId           string    // application use - creating application id

	// Valid only with Channel.Consume
	ConsumerTag string

	// Valid only with Channel.Get
	MessageCount uint32

	DeliveryTag uint64
	Redelivered bool
	Exchange    string // basic.publish exchange
	RoutingKey  string // basic.publish routing key

	Body []byte
}

func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
	props, body := msg.getContent()

	delivery := Delivery{
		Acknowledger: channel,

		Headers:         props.Headers,
		ContentType:     props.ContentType,
		ContentEncoding: props.ContentEncoding,
		DeliveryMode:    props.DeliveryMode,
		Priority:        props.Priority,
		CorrelationId:   props.CorrelationId,
		ReplyTo:         props.ReplyTo,
		Expiration:      props.Expiration,
		MessageId:       props.MessageId,
		Timestamp:       props.Timestamp,
		Type:            props.Type,
		UserId:          props.UserId,
		AppId:           props.AppId,

		Body: body,
	}

	// Properties for the delivery types
	switch m := msg.(type) {
	case *basicDeliver:
		delivery.ConsumerTag = m.ConsumerTag
		delivery.DeliveryTag = m.DeliveryTag
		delivery.Redelivered = m.Redelivered
		delivery.Exchange = m.Exchange
		delivery.RoutingKey = m.RoutingKey

	case *basicGetOk:
		delivery.MessageCount = m.MessageCount
		delivery.DeliveryTag = m.DeliveryTag
		delivery.Redelivered = m.Redelivered
		delivery.Exchange = m.Exchange
		delivery.RoutingKey = m.RoutingKey
	}

	return &delivery
}

/*
Ack delegates an acknowledgement through the Acknowledger interface that the
client or server has finished work on a delivery.

All deliveries in AMQP must be acknowledged.  If you called Channel.Consume
with autoAck true then the server will be automatically ack each message and
this method should not be called.  Otherwise, you must call Delivery.Ack after
you have successfully processed this delivery.

When multiple is true, this delivery and all prior unacknowledged deliveries
on the same channel will be acknowledged.  This is useful for batch processing
of deliveries.

An error will indicate that the acknowledge could not be delivered to the
channel it was sent from.

Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Ack(multiple bool) error {
	if d.Acknowledger == nil {
		return errDeliveryNotInitialized
	}
	return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}

/*
Reject delegates a negatively acknowledgement through the Acknowledger interface.

When requeue is true, queue this message to be delivered to a consumer on a
different channel.  When requeue is false or the server is unable to queue this
message, it will be dropped.

If you are batch processing deliveries, and your server supports it, prefer
Delivery.Nack.

Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Reject(requeue bool) error {
	if d.Acknowledger == nil {
		return errDeliveryNotInitialized
	}
	return d.Acknowledger.Reject(d.DeliveryTag, requeue)
}

/*
Nack negatively acknowledge the delivery of message(s) identified by the
delivery tag from either the client or server.

When multiple is true, nack messages up to and including delivered messages up
until the delivery tag delivered on the same channel.

When requeue is true, request the server to deliver this message to a different
consumer.  If it is not possible or requeue is false, the message will be
dropped or delivered to a server configured dead-letter queue.

This method must not be used to select or requeue messages the client wishes
not to handle, rather it is to inform the server that the client is incapable
of handling this message at this time.

Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Nack(multiple, requeue bool) error {
	if d.Acknowledger == nil {
		return errDeliveryNotInitialized
	}
	return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}