summaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/connection.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/connection.go842
1 files changed, 842 insertions, 0 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go
new file mode 100644
index 00000000..ca1372d0
--- /dev/null
+++ b/src/dma/vendor/github.com/streadway/amqp/connection.go
@@ -0,0 +1,842 @@
+// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/streadway/amqp
+
+package amqp
+
+import (
+ "bufio"
+ "crypto/tls"
+ "io"
+ "net"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ maxChannelMax = (2 << 15) - 1
+
+ defaultHeartbeat = 10 * time.Second
+ defaultConnectionTimeout = 30 * time.Second
+ defaultProduct = "https://github.com/streadway/amqp"
+ defaultVersion = "β"
+ // Safer default that makes channel leaks a lot easier to spot
+ // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
+ defaultChannelMax = (2 << 10) - 1
+ defaultLocale = "en_US"
+)
+
+// Config is used in DialConfig and Open to specify the desired tuning
+// parameters used during a connection open handshake. The negotiated tuning
+// will be stored in the returned connection's Config field.
+type Config struct {
+ // The SASL mechanisms to try in the client request, and the successful
+ // mechanism used on the Connection object.
+ // If SASL is nil, PlainAuth from the URL is used.
+ SASL []Authentication
+
+ // Vhost specifies the namespace of permissions, exchanges, queues and
+ // bindings on the server. Dial sets this to the path parsed from the URL.
+ Vhost string
+
+ ChannelMax int // 0 max channels means 2^16 - 1
+ FrameSize int // 0 max bytes means unlimited
+ Heartbeat time.Duration // less than 1s uses the server's interval
+
+ // TLSClientConfig specifies the client configuration of the TLS connection
+ // when establishing a tls transport.
+ // If the URL uses an amqps scheme, then an empty tls.Config with the
+ // ServerName from the URL is used.
+ TLSClientConfig *tls.Config
+
+ // Properties is table of properties that the client advertises to the server.
+ // This is an optional setting - if the application does not set this,
+ // the underlying library will use a generic set of client properties.
+ Properties Table
+
+ // Connection locale that we expect to always be en_US
+ // Even though servers must return it as per the AMQP 0-9-1 spec,
+ // we are not aware of it being used other than to satisfy the spec requirements
+ Locale string
+
+ // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
+ // then an AMQP connection handshake.
+ // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
+ // used during TLS and AMQP handshaking.
+ Dial func(network, addr string) (net.Conn, error)
+}
+
+// Connection manages the serialization and deserialization of frames from IO
+// and dispatches the frames to the appropriate channel. All RPC methods and
+// asynchronous Publishing, Delivery, Ack, Nack and Return messages are
+// multiplexed on this channel. There must always be active receivers for
+// every asynchronous message on this connection.
+type Connection struct {
+ destructor sync.Once // shutdown once
+ sendM sync.Mutex // conn writer mutex
+ m sync.Mutex // struct field mutex
+
+ conn io.ReadWriteCloser
+
+ rpc chan message
+ writer *writer
+ sends chan time.Time // timestamps of each frame sent
+ deadlines chan readDeadliner // heartbeater updates read deadlines
+
+ allocator *allocator // id generator valid after openTune
+ channels map[uint16]*Channel
+
+ noNotify bool // true when we will never notify again
+ closes []chan *Error
+ blocks []chan Blocking
+
+ errors chan *Error
+
+ Config Config // The negotiated Config after connection.open
+
+ Major int // Server's major version
+ Minor int // Server's minor version
+ Properties Table // Server properties
+ Locales []string // Server locales
+
+ closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
+}
+
+type readDeadliner interface {
+ SetReadDeadline(time.Time) error
+}
+
+// defaultDial establishes a connection when config.Dial is not provided
+func defaultDial(network, addr string) (net.Conn, error) {
+ conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
+ if err != nil {
+ return nil, err
+ }
+
+ // Heartbeating hasn't started yet, don't stall forever on a dead server.
+ // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
+ // the deadline is cleared in openComplete.
+ if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
+ return nil, err
+ }
+
+ return conn, nil
+}
+
+// Dial accepts a string in the AMQP URI format and returns a new Connection
+// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
+// seconds and sets the handshake deadline to 30 seconds. After handshake,
+// deadlines are cleared.
+//
+// Dial uses the zero value of tls.Config when it encounters an amqps://
+// scheme. It is equivalent to calling DialTLS(amqp, nil).
+func Dial(url string) (*Connection, error) {
+ return DialConfig(url, Config{
+ Heartbeat: defaultHeartbeat,
+ Locale: defaultLocale,
+ })
+}
+
+// DialTLS accepts a string in the AMQP URI format and returns a new Connection
+// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
+// seconds and sets the initial read deadline to 30 seconds.
+//
+// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
+func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
+ return DialConfig(url, Config{
+ Heartbeat: defaultHeartbeat,
+ TLSClientConfig: amqps,
+ Locale: defaultLocale,
+ })
+}
+
+// DialConfig accepts a string in the AMQP URI format and a configuration for
+// the transport and connection setup, returning a new Connection. Defaults to
+// a server heartbeat interval of 10 seconds and sets the initial read deadline
+// to 30 seconds.
+func DialConfig(url string, config Config) (*Connection, error) {
+ var err error
+ var conn net.Conn
+
+ uri, err := ParseURI(url)
+ if err != nil {
+ return nil, err
+ }
+
+ if config.SASL == nil {
+ config.SASL = []Authentication{uri.PlainAuth()}
+ }
+
+ if config.Vhost == "" {
+ config.Vhost = uri.Vhost
+ }
+
+ addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
+
+ dialer := config.Dial
+ if dialer == nil {
+ dialer = defaultDial
+ }
+
+ conn, err = dialer("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+
+ if uri.Scheme == "amqps" {
+ if config.TLSClientConfig == nil {
+ config.TLSClientConfig = new(tls.Config)
+ }
+
+ // If ServerName has not been specified in TLSClientConfig,
+ // set it to the URI host used for this connection.
+ if config.TLSClientConfig.ServerName == "" {
+ config.TLSClientConfig.ServerName = uri.Host
+ }
+
+ client := tls.Client(conn, config.TLSClientConfig)
+ if err := client.Handshake(); err != nil {
+ conn.Close()
+ return nil, err
+ }
+
+ conn = client
+ }
+
+ return Open(conn, config)
+}
+
+/*
+Open accepts an already established connection, or other io.ReadWriteCloser as
+a transport. Use this method if you have established a TLS connection or wish
+to use your own custom transport.
+
+*/
+func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
+ c := &Connection{
+ conn: conn,
+ writer: &writer{bufio.NewWriter(conn)},
+ channels: make(map[uint16]*Channel),
+ rpc: make(chan message),
+ sends: make(chan time.Time),
+ errors: make(chan *Error, 1),
+ deadlines: make(chan readDeadliner, 1),
+ }
+ go c.reader(conn)
+ return c, c.open(config)
+}
+
+/*
+LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
+as a fallback default value if the underlying transport does not support LocalAddr().
+*/
+func (c *Connection) LocalAddr() net.Addr {
+ if conn, ok := c.conn.(interface {
+ LocalAddr() net.Addr
+ }); ok {
+ return conn.LocalAddr()
+ }
+ return &net.TCPAddr{}
+}
+
+// ConnectionState returns basic TLS details of the underlying transport.
+// Returns a zero value when the underlying connection does not implement
+// ConnectionState() tls.ConnectionState.
+func (c *Connection) ConnectionState() tls.ConnectionState {
+ if conn, ok := c.conn.(interface {
+ ConnectionState() tls.ConnectionState
+ }); ok {
+ return conn.ConnectionState()
+ }
+ return tls.ConnectionState{}
+}
+
+/*
+NotifyClose registers a listener for close events either initiated by an error
+accompanying a connection.close method or by a normal shutdown.
+
+On normal shutdowns, the chan will be closed.
+
+To reconnect after a transport or protocol error, register a listener here and
+re-run your setup process.
+
+*/
+func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.noNotify {
+ close(receiver)
+ } else {
+ c.closes = append(c.closes, receiver)
+ }
+
+ return receiver
+}
+
+/*
+NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
+method extensions connection.blocked and connection.unblocked. Flow control is
+active with a reason when Blocking.Blocked is true. When a Connection is
+blocked, all methods will block across all connections until server resources
+become free again.
+
+This optional extension is supported by the server when the
+"connection.blocked" server capability key is true.
+
+*/
+func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.noNotify {
+ close(receiver)
+ } else {
+ c.blocks = append(c.blocks, receiver)
+ }
+
+ return receiver
+}
+
+/*
+Close requests and waits for the response to close the AMQP connection.
+
+It's advisable to use this message when publishing to ensure all kernel buffers
+have been flushed on the server and client before exiting.
+
+An error indicates that server may not have received this request to close but
+the connection should be treated as closed regardless.
+
+After returning from this call, all resources associated with this connection,
+including the underlying io, Channels, Notify listeners and Channel consumers
+will also be closed.
+*/
+func (c *Connection) Close() error {
+ if c.isClosed() {
+ return ErrClosed
+ }
+
+ defer c.shutdown(nil)
+ return c.call(
+ &connectionClose{
+ ReplyCode: replySuccess,
+ ReplyText: "kthxbai",
+ },
+ &connectionCloseOk{},
+ )
+}
+
+func (c *Connection) closeWith(err *Error) error {
+ if c.isClosed() {
+ return ErrClosed
+ }
+
+ defer c.shutdown(err)
+ return c.call(
+ &connectionClose{
+ ReplyCode: uint16(err.Code),
+ ReplyText: err.Reason,
+ },
+ &connectionCloseOk{},
+ )
+}
+
+func (c *Connection) isClosed() bool {
+ return (atomic.LoadInt32(&c.closed) == 1)
+}
+
+func (c *Connection) send(f frame) error {
+ if c.isClosed() {
+ return ErrClosed
+ }
+
+ c.sendM.Lock()
+ err := c.writer.WriteFrame(f)
+ c.sendM.Unlock()
+
+ if err != nil {
+ // shutdown could be re-entrant from signaling notify chans
+ go c.shutdown(&Error{
+ Code: FrameError,
+ Reason: err.Error(),
+ })
+ } else {
+ // Broadcast we sent a frame, reducing heartbeats, only
+ // if there is something that can receive - like a non-reentrant
+ // call or if the heartbeater isn't running
+ select {
+ case c.sends <- time.Now():
+ default:
+ }
+ }
+
+ return err
+}
+
+func (c *Connection) shutdown(err *Error) {
+ atomic.StoreInt32(&c.closed, 1)
+
+ c.destructor.Do(func() {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if err != nil {
+ for _, c := range c.closes {
+ c <- err
+ }
+ }
+
+ if err != nil {
+ c.errors <- err
+ }
+ // Shutdown handler goroutine can still receive the result.
+ close(c.errors)
+
+ for _, c := range c.closes {
+ close(c)
+ }
+
+ for _, c := range c.blocks {
+ close(c)
+ }
+
+ // Shutdown the channel, but do not use closeChannel() as it calls
+ // releaseChannel() which requires the connection lock.
+ //
+ // Ranging over c.channels and calling releaseChannel() that mutates
+ // c.channels is racy - see commit 6063341 for an example.
+ for _, ch := range c.channels {
+ ch.shutdown(err)
+ }
+
+ c.conn.Close()
+
+ c.channels = map[uint16]*Channel{}
+ c.allocator = newAllocator(1, c.Config.ChannelMax)
+ c.noNotify = true
+ })
+}
+
+// All methods sent to the connection channel should be synchronous so we
+// can handle them directly without a framing component
+func (c *Connection) demux(f frame) {
+ if f.channel() == 0 {
+ c.dispatch0(f)
+ } else {
+ c.dispatchN(f)
+ }
+}
+
+func (c *Connection) dispatch0(f frame) {
+ switch mf := f.(type) {
+ case *methodFrame:
+ switch m := mf.Method.(type) {
+ case *connectionClose:
+ // Send immediately as shutdown will close our side of the writer.
+ c.send(&methodFrame{
+ ChannelId: 0,
+ Method: &connectionCloseOk{},
+ })
+
+ c.shutdown(newError(m.ReplyCode, m.ReplyText))
+ case *connectionBlocked:
+ for _, c := range c.blocks {
+ c <- Blocking{Active: true, Reason: m.Reason}
+ }
+ case *connectionUnblocked:
+ for _, c := range c.blocks {
+ c <- Blocking{Active: false}
+ }
+ default:
+ c.rpc <- m
+ }
+ case *heartbeatFrame:
+ // kthx - all reads reset our deadline. so we can drop this
+ default:
+ // lolwat - channel0 only responds to methods and heartbeats
+ c.closeWith(ErrUnexpectedFrame)
+ }
+}
+
+func (c *Connection) dispatchN(f frame) {
+ c.m.Lock()
+ channel := c.channels[f.channel()]
+ c.m.Unlock()
+
+ if channel != nil {
+ channel.recv(channel, f)
+ } else {
+ c.dispatchClosed(f)
+ }
+}
+
+// section 2.3.7: "When a peer decides to close a channel or connection, it
+// sends a Close method. The receiving peer MUST respond to a Close with a
+// Close-Ok, and then both parties can close their channel or connection. Note
+// that if peers ignore Close, deadlock can happen when both peers send Close
+// at the same time."
+//
+// When we don't have a channel, so we must respond with close-ok on a close
+// method. This can happen between a channel exception on an asynchronous
+// method like basic.publish and a synchronous close with channel.close.
+// In that case, we'll get both a channel.close and channel.close-ok in any
+// order.
+func (c *Connection) dispatchClosed(f frame) {
+ // Only consider method frames, drop content/header frames
+ if mf, ok := f.(*methodFrame); ok {
+ switch mf.Method.(type) {
+ case *channelClose:
+ c.send(&methodFrame{
+ ChannelId: f.channel(),
+ Method: &channelCloseOk{},
+ })
+ case *channelCloseOk:
+ // we are already closed, so do nothing
+ default:
+ // unexpected method on closed channel
+ c.closeWith(ErrClosed)
+ }
+ }
+}
+
+// Reads each frame off the IO and hand off to the connection object that
+// will demux the streams and dispatch to one of the opened channels or
+// handle on channel 0 (the connection channel).
+func (c *Connection) reader(r io.Reader) {
+ buf := bufio.NewReader(r)
+ frames := &reader{buf}
+ conn, haveDeadliner := r.(readDeadliner)
+
+ for {
+ frame, err := frames.ReadFrame()
+
+ if err != nil {
+ c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
+ return
+ }
+
+ c.demux(frame)
+
+ if haveDeadliner {
+ c.deadlines <- conn
+ }
+ }
+}
+
+// Ensures that at least one frame is being sent at the tuned interval with a
+// jitter tolerance of 1s
+func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
+ const maxServerHeartbeatsInFlight = 3
+
+ var sendTicks <-chan time.Time
+ if interval > 0 {
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ sendTicks = ticker.C
+ }
+
+ lastSent := time.Now()
+
+ for {
+ select {
+ case at, stillSending := <-c.sends:
+ // When actively sending, depend on sent frames to reset server timer
+ if stillSending {
+ lastSent = at
+ } else {
+ return
+ }
+
+ case at := <-sendTicks:
+ // When idle, fill the space with a heartbeat frame
+ if at.Sub(lastSent) > interval-time.Second {
+ if err := c.send(&heartbeatFrame{}); err != nil {
+ // send heartbeats even after close/closeOk so we
+ // tick until the connection starts erroring
+ return
+ }
+ }
+
+ case conn := <-c.deadlines:
+ // When reading, reset our side of the deadline, if we've negotiated one with
+ // a deadline that covers at least 2 server heartbeats
+ if interval > 0 {
+ conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
+ }
+
+ case <-done:
+ return
+ }
+ }
+}
+
+// Convenience method to inspect the Connection.Properties["capabilities"]
+// Table for server identified capabilities like "basic.ack" or
+// "confirm.select".
+func (c *Connection) isCapable(featureName string) bool {
+ capabilities, _ := c.Properties["capabilities"].(Table)
+ hasFeature, _ := capabilities[featureName].(bool)
+ return hasFeature
+}
+
+// allocateChannel records but does not open a new channel with a unique id.
+// This method is the initial part of the channel lifecycle and paired with
+// releaseChannel
+func (c *Connection) allocateChannel() (*Channel, error) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ if c.isClosed() {
+ return nil, ErrClosed
+ }
+
+ id, ok := c.allocator.next()
+ if !ok {
+ return nil, ErrChannelMax
+ }
+
+ ch := newChannel(c, uint16(id))
+ c.channels[uint16(id)] = ch
+
+ return ch, nil
+}
+
+// releaseChannel removes a channel from the registry as the final part of the
+// channel lifecycle
+func (c *Connection) releaseChannel(id uint16) {
+ c.m.Lock()
+ defer c.m.Unlock()
+
+ delete(c.channels, id)
+ c.allocator.release(int(id))
+}
+
+// openChannel allocates and opens a channel, must be paired with closeChannel
+func (c *Connection) openChannel() (*Channel, error) {
+ ch, err := c.allocateChannel()
+ if err != nil {
+ return nil, err
+ }
+
+ if err := ch.open(); err != nil {
+ c.releaseChannel(ch.id)
+ return nil, err
+ }
+ return ch, nil
+}
+
+// closeChannel releases and initiates a shutdown of the channel. All channel
+// closures should be initiated here for proper channel lifecycle management on
+// this connection.
+func (c *Connection) closeChannel(ch *Channel, e *Error) {
+ ch.shutdown(e)
+ c.releaseChannel(ch.id)
+}
+
+/*
+Channel opens a unique, concurrent server channel to process the bulk of AMQP
+messages. Any error from methods on this receiver will render the receiver
+invalid and a new Channel should be opened.
+
+*/
+func (c *Connection) Channel() (*Channel, error) {
+ return c.openChannel()
+}
+
+func (c *Connection) call(req message, res ...message) error {
+ // Special case for when the protocol header frame is sent insted of a
+ // request method
+ if req != nil {
+ if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
+ return err
+ }
+ }
+
+ select {
+ case err, ok := <-c.errors:
+ if !ok {
+ return ErrClosed
+ }
+ return err
+
+ case msg := <-c.rpc:
+ // Try to match one of the result types
+ for _, try := range res {
+ if reflect.TypeOf(msg) == reflect.TypeOf(try) {
+ // *res = *msg
+ vres := reflect.ValueOf(try).Elem()
+ vmsg := reflect.ValueOf(msg).Elem()
+ vres.Set(vmsg)
+ return nil
+ }
+ }
+ return ErrCommandInvalid
+ }
+ // unreachable
+}
+
+// Connection = open-Connection *use-Connection close-Connection
+// open-Connection = C:protocol-header
+// S:START C:START-OK
+// *challenge
+// S:TUNE C:TUNE-OK
+// C:OPEN S:OPEN-OK
+// challenge = S:SECURE C:SECURE-OK
+// use-Connection = *channel
+// close-Connection = C:CLOSE S:CLOSE-OK
+// / S:CLOSE C:CLOSE-OK
+func (c *Connection) open(config Config) error {
+ if err := c.send(&protocolHeader{}); err != nil {
+ return err
+ }
+
+ return c.openStart(config)
+}
+
+func (c *Connection) openStart(config Config) error {
+ start := &connectionStart{}
+
+ if err := c.call(nil, start); err != nil {
+ return err
+ }
+
+ c.Major = int(start.VersionMajor)
+ c.Minor = int(start.VersionMinor)
+ c.Properties = Table(start.ServerProperties)
+ c.Locales = strings.Split(start.Locales, " ")
+
+ // eventually support challenge/response here by also responding to
+ // connectionSecure.
+ auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
+ if !ok {
+ return ErrSASL
+ }
+
+ // Save this mechanism off as the one we chose
+ c.Config.SASL = []Authentication{auth}
+
+ // Set the connection locale to client locale
+ c.Config.Locale = config.Locale
+
+ return c.openTune(config, auth)
+}
+
+func (c *Connection) openTune(config Config, auth Authentication) error {
+ if len(config.Properties) == 0 {
+ config.Properties = Table{
+ "product": defaultProduct,
+ "version": defaultVersion,
+ }
+ }
+
+ config.Properties["capabilities"] = Table{
+ "connection.blocked": true,
+ "consumer_cancel_notify": true,
+ }
+
+ ok := &connectionStartOk{
+ ClientProperties: config.Properties,
+ Mechanism: auth.Mechanism(),
+ Response: auth.Response(),
+ Locale: config.Locale,
+ }
+ tune := &connectionTune{}
+
+ if err := c.call(ok, tune); err != nil {
+ // per spec, a connection can only be closed when it has been opened
+ // so at this point, we know it's an auth error, but the socket
+ // was closed instead. Return a meaningful error.
+ return ErrCredentials
+ }
+
+ // When the server and client both use default 0, then the max channel is
+ // only limited by uint16.
+ c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
+ if c.Config.ChannelMax == 0 {
+ c.Config.ChannelMax = defaultChannelMax
+ }
+ c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
+
+ // Frame size includes headers and end byte (len(payload)+8), even if
+ // this is less than FrameMinSize, use what the server sends because the
+ // alternative is to stop the handshake here.
+ c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
+
+ // Save this off for resetDeadline()
+ c.Config.Heartbeat = time.Second * time.Duration(pick(
+ int(config.Heartbeat/time.Second),
+ int(tune.Heartbeat)))
+
+ // "The client should start sending heartbeats after receiving a
+ // Connection.Tune method"
+ go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
+
+ if err := c.send(&methodFrame{
+ ChannelId: 0,
+ Method: &connectionTuneOk{
+ ChannelMax: uint16(c.Config.ChannelMax),
+ FrameMax: uint32(c.Config.FrameSize),
+ Heartbeat: uint16(c.Config.Heartbeat / time.Second),
+ },
+ }); err != nil {
+ return err
+ }
+
+ return c.openVhost(config)
+}
+
+func (c *Connection) openVhost(config Config) error {
+ req := &connectionOpen{VirtualHost: config.Vhost}
+ res := &connectionOpenOk{}
+
+ if err := c.call(req, res); err != nil {
+ // Cannot be closed yet, but we know it's a vhost problem
+ return ErrVhost
+ }
+
+ c.Config.Vhost = config.Vhost
+
+ return c.openComplete()
+}
+
+// openComplete performs any final Connection initialization dependent on the
+// connection handshake and clears any state needed for TLS and AMQP handshaking.
+func (c *Connection) openComplete() error {
+ // We clear the deadlines and let the heartbeater reset the read deadline if requested.
+ // RabbitMQ uses TCP flow control at this point for pushback so Writes can
+ // intentionally block.
+ if deadliner, ok := c.conn.(interface {
+ SetDeadline(time.Time) error
+ }); ok {
+ _ = deadliner.SetDeadline(time.Time{})
+ }
+
+ c.allocator = newAllocator(1, c.Config.ChannelMax)
+ return nil
+}
+
+func max(a, b int) int {
+ if a > b {
+ return a
+ }
+ return b
+}
+
+func min(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+func pick(client, server int) int {
+ if client == 0 || server == 0 {
+ return max(client, server)
+ }
+ return min(client, server)
+}