diff options
author | Tomofumi Hayashi <tohayash@redhat.com> | 2019-04-27 20:38:39 +0900 |
---|---|---|
committer | Tomofumi Hayashi <tohayash@redhat.com> | 2019-04-27 20:40:37 +0900 |
commit | 4d11ca17d0f73f5bd783f45900118295fdfed46b (patch) | |
tree | cce8575b02ac850d2b30ec12a5c4083c48e85c6c /src/dma/vendor/github.com/streadway/amqp/connection.go | |
parent | 07e4a96e4996f3d39b92dd601b3ed0d23bfbaa0c (diff) |
barometer: update DMA's vendoring packages
Change-Id: I0578b094f1ecdaed20c906be2ba29d51b8089d7c
Signed-off-by: Tomofumi Hayashi <tohayash@redhat.com>
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/connection.go')
-rw-r--r-- | src/dma/vendor/github.com/streadway/amqp/connection.go | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go index ca1372d0..b9d8e8ee 100644 --- a/src/dma/vendor/github.com/streadway/amqp/connection.go +++ b/src/dma/vendor/github.com/streadway/amqp/connection.go @@ -111,21 +111,23 @@ type readDeadliner interface { SetReadDeadline(time.Time) error } -// defaultDial establishes a connection when config.Dial is not provided -func defaultDial(network, addr string) (net.Conn, error) { - conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout) - if err != nil { - return nil, err - } +// DefaultDial establishes a connection when config.Dial is not provided +func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) { + return func(network, addr string) (net.Conn, error) { + conn, err := net.DialTimeout(network, addr, connectionTimeout) + if err != nil { + return nil, err + } - // Heartbeating hasn't started yet, don't stall forever on a dead server. - // A deadline is set for TLS and AMQP handshaking. After AMQP is established, - // the deadline is cleared in openComplete. - if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil { - return nil, err - } + // Heartbeating hasn't started yet, don't stall forever on a dead server. + // A deadline is set for TLS and AMQP handshaking. After AMQP is established, + // the deadline is cleared in openComplete. + if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil { + return nil, err + } - return conn, nil + return conn, nil + } } // Dial accepts a string in the AMQP URI format and returns a new Connection @@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) { dialer := config.Dial if dialer == nil { - dialer = defaultDial + dialer = DefaultDial(defaultConnectionTimeout) } conn, err = dialer("tcp", addr) @@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) { client := tls.Client(conn, config.TLSClientConfig) if err := client.Handshake(); err != nil { + conn.Close() return nil, err } @@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers will also be closed. */ func (c *Connection) Close() error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -332,7 +335,7 @@ func (c *Connection) Close() error { } func (c *Connection) closeWith(err *Error) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error { ) } -func (c *Connection) isClosed() bool { +// IsClosed returns true if the connection is marked as closed, otherwise false +// is returned. +func (c *Connection) IsClosed() bool { return (atomic.LoadInt32(&c.closed) == 1) } func (c *Connection) send(f frame) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) { c.m.Lock() defer c.m.Unlock() - if c.isClosed() { + if c.IsClosed() { return nil, ErrClosed } |