aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/confirms.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/confirms.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/confirms.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/confirms.go b/src/dma/vendor/github.com/streadway/amqp/confirms.go
new file mode 100644
index 00000000..06cbaa71
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/confirms.go
@@ -0,0 +1,94 @@
+package amqp
+
+import "sync"
+
+// confirms resequences and notifies one or multiple publisher confirmation listeners
+type confirms struct {
+ m sync.Mutex
+ listeners []chan Confirmation
+ sequencer map[uint64]Confirmation
+ published uint64
+ expecting uint64
+}
+
+// newConfirms allocates a confirms
+func newConfirms() *confirms {
+ return &confirms{
+ sequencer: map[uint64]Confirmation{},
+ published: 0,
+ expecting: 1,
+ }
+}
+
+func (c *confirms) Listen(l chan Confirmation) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ c.listeners = append(c.listeners, l)
+}
+
+// publish increments the publishing counter
+func (c *confirms) Publish() uint64 {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ c.published++
+ return c.published
+}
+
+// confirm confirms one publishing, increments the expecting delivery tag, and
+// removes bookkeeping for that delivery tag.
+func (c *confirms) confirm(confirmation Confirmation) {
+ delete(c.sequencer, c.expecting)
+ c.expecting++
+ for _, l := range c.listeners {
+ l <- confirmation
+ }
+}
+
+// resequence confirms any out of order delivered confirmations
+func (c *confirms) resequence() {
+ for c.expecting <= c.published {
+ sequenced, found := c.sequencer[c.expecting]
+ if !found {
+ return
+ }
+ c.confirm(sequenced)
+ }
+}
+
+// one confirms one publishing and all following in the publishing sequence
+func (c *confirms) One(confirmed Confirmation) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.expecting == confirmed.DeliveryTag {
+ c.confirm(confirmed)
+ } else {
+ c.sequencer[confirmed.DeliveryTag] = confirmed
+ }
+ c.resequence()
+}
+
+// multiple confirms all publishings up until the delivery tag
+func (c *confirms) Multiple(confirmed Confirmation) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ for c.expecting <= confirmed.DeliveryTag {
+ c.confirm(Confirmation{c.expecting, confirmed.Ack})
+ }
+ c.resequence()
+}
+
+// Close closes all listeners, discarding any out of sequence confirmations
+func (c *confirms) Close() error {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ for _, l := range c.listeners {
+ close(l)
+ }
+ c.listeners = nil
+ return nil
+}