diff options
author | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-06 09:04:29 +0000 |
---|---|---|
committer | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-07 06:03:01 +0000 |
commit | d61931341176dad9ccff7c967a10d88fe54218fa (patch) | |
tree | 526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/vendor/github.com/streadway/amqp/confirms.go | |
parent | 73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff) |
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa
Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/confirms.go')
-rw-r--r-- | src/dma/vendor/github.com/streadway/amqp/confirms.go | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/confirms.go b/src/dma/vendor/github.com/streadway/amqp/confirms.go new file mode 100644 index 00000000..06cbaa71 --- /dev/null +++ b/src/dma/vendor/github.com/streadway/amqp/confirms.go @@ -0,0 +1,94 @@ +package amqp + +import "sync" + +// confirms resequences and notifies one or multiple publisher confirmation listeners +type confirms struct { + m sync.Mutex + listeners []chan Confirmation + sequencer map[uint64]Confirmation + published uint64 + expecting uint64 +} + +// newConfirms allocates a confirms +func newConfirms() *confirms { + return &confirms{ + sequencer: map[uint64]Confirmation{}, + published: 0, + expecting: 1, + } +} + +func (c *confirms) Listen(l chan Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + c.listeners = append(c.listeners, l) +} + +// publish increments the publishing counter +func (c *confirms) Publish() uint64 { + c.m.Lock() + defer c.m.Unlock() + + c.published++ + return c.published +} + +// confirm confirms one publishing, increments the expecting delivery tag, and +// removes bookkeeping for that delivery tag. +func (c *confirms) confirm(confirmation Confirmation) { + delete(c.sequencer, c.expecting) + c.expecting++ + for _, l := range c.listeners { + l <- confirmation + } +} + +// resequence confirms any out of order delivered confirmations +func (c *confirms) resequence() { + for c.expecting <= c.published { + sequenced, found := c.sequencer[c.expecting] + if !found { + return + } + c.confirm(sequenced) + } +} + +// one confirms one publishing and all following in the publishing sequence +func (c *confirms) One(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + if c.expecting == confirmed.DeliveryTag { + c.confirm(confirmed) + } else { + c.sequencer[confirmed.DeliveryTag] = confirmed + } + c.resequence() +} + +// multiple confirms all publishings up until the delivery tag +func (c *confirms) Multiple(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + for c.expecting <= confirmed.DeliveryTag { + c.confirm(Confirmation{c.expecting, confirmed.Ack}) + } + c.resequence() +} + +// Close closes all listeners, discarding any out of sequence confirmations +func (c *confirms) Close() error { + c.m.Lock() + defer c.m.Unlock() + + for _, l := range c.listeners { + close(l) + } + c.listeners = nil + return nil +} |