From d61931341176dad9ccff7c967a10d88fe54218fa Mon Sep 17 00:00:00 2001 From: Toshiaki Takahashi Date: Thu, 6 Sep 2018 09:04:29 +0000 Subject: src: Add DMA localagent Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa Signed-off-by: Toshiaki Takahashi --- .../vendor/github.com/streadway/amqp/confirms.go | 94 ++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 src/dma/vendor/github.com/streadway/amqp/confirms.go (limited to 'src/dma/vendor/github.com/streadway/amqp/confirms.go') diff --git a/src/dma/vendor/github.com/streadway/amqp/confirms.go b/src/dma/vendor/github.com/streadway/amqp/confirms.go new file mode 100644 index 00000000..06cbaa71 --- /dev/null +++ b/src/dma/vendor/github.com/streadway/amqp/confirms.go @@ -0,0 +1,94 @@ +package amqp + +import "sync" + +// confirms resequences and notifies one or multiple publisher confirmation listeners +type confirms struct { + m sync.Mutex + listeners []chan Confirmation + sequencer map[uint64]Confirmation + published uint64 + expecting uint64 +} + +// newConfirms allocates a confirms +func newConfirms() *confirms { + return &confirms{ + sequencer: map[uint64]Confirmation{}, + published: 0, + expecting: 1, + } +} + +func (c *confirms) Listen(l chan Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + c.listeners = append(c.listeners, l) +} + +// publish increments the publishing counter +func (c *confirms) Publish() uint64 { + c.m.Lock() + defer c.m.Unlock() + + c.published++ + return c.published +} + +// confirm confirms one publishing, increments the expecting delivery tag, and +// removes bookkeeping for that delivery tag. +func (c *confirms) confirm(confirmation Confirmation) { + delete(c.sequencer, c.expecting) + c.expecting++ + for _, l := range c.listeners { + l <- confirmation + } +} + +// resequence confirms any out of order delivered confirmations +func (c *confirms) resequence() { + for c.expecting <= c.published { + sequenced, found := c.sequencer[c.expecting] + if !found { + return + } + c.confirm(sequenced) + } +} + +// one confirms one publishing and all following in the publishing sequence +func (c *confirms) One(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + if c.expecting == confirmed.DeliveryTag { + c.confirm(confirmed) + } else { + c.sequencer[confirmed.DeliveryTag] = confirmed + } + c.resequence() +} + +// multiple confirms all publishings up until the delivery tag +func (c *confirms) Multiple(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + for c.expecting <= confirmed.DeliveryTag { + c.confirm(Confirmation{c.expecting, confirmed.Ack}) + } + c.resequence() +} + +// Close closes all listeners, discarding any out of sequence confirmations +func (c *confirms) Close() error { + c.m.Lock() + defer c.m.Unlock() + + for _, l := range c.listeners { + close(l) + } + c.listeners = nil + return nil +} -- cgit 1.2.3-korg