From 4d11ca17d0f73f5bd783f45900118295fdfed46b Mon Sep 17 00:00:00 2001 From: Tomofumi Hayashi Date: Sat, 27 Apr 2019 20:38:39 +0900 Subject: barometer: update DMA's vendoring packages Change-Id: I0578b094f1ecdaed20c906be2ba29d51b8089d7c Signed-off-by: Tomofumi Hayashi --- .../vendor/github.com/streadway/amqp/connection.go | 43 ++++++++++++---------- 1 file changed, 24 insertions(+), 19 deletions(-) (limited to 'src/dma/vendor/github.com/streadway/amqp/connection.go') diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go index ca1372d0..b9d8e8ee 100644 --- a/src/dma/vendor/github.com/streadway/amqp/connection.go +++ b/src/dma/vendor/github.com/streadway/amqp/connection.go @@ -111,21 +111,23 @@ type readDeadliner interface { SetReadDeadline(time.Time) error } -// defaultDial establishes a connection when config.Dial is not provided -func defaultDial(network, addr string) (net.Conn, error) { - conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout) - if err != nil { - return nil, err - } +// DefaultDial establishes a connection when config.Dial is not provided +func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) { + return func(network, addr string) (net.Conn, error) { + conn, err := net.DialTimeout(network, addr, connectionTimeout) + if err != nil { + return nil, err + } - // Heartbeating hasn't started yet, don't stall forever on a dead server. - // A deadline is set for TLS and AMQP handshaking. After AMQP is established, - // the deadline is cleared in openComplete. - if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil { - return nil, err - } + // Heartbeating hasn't started yet, don't stall forever on a dead server. + // A deadline is set for TLS and AMQP handshaking. After AMQP is established, + // the deadline is cleared in openComplete. + if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil { + return nil, err + } - return conn, nil + return conn, nil + } } // Dial accepts a string in the AMQP URI format and returns a new Connection @@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) { dialer := config.Dial if dialer == nil { - dialer = defaultDial + dialer = DefaultDial(defaultConnectionTimeout) } conn, err = dialer("tcp", addr) @@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) { client := tls.Client(conn, config.TLSClientConfig) if err := client.Handshake(); err != nil { + conn.Close() return nil, err } @@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers will also be closed. */ func (c *Connection) Close() error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -332,7 +335,7 @@ func (c *Connection) Close() error { } func (c *Connection) closeWith(err *Error) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error { ) } -func (c *Connection) isClosed() bool { +// IsClosed returns true if the connection is marked as closed, otherwise false +// is returned. +func (c *Connection) IsClosed() bool { return (atomic.LoadInt32(&c.closed) == 1) } func (c *Connection) send(f frame) error { - if c.isClosed() { + if c.IsClosed() { return ErrClosed } @@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) { c.m.Lock() defer c.m.Unlock() - if c.isClosed() { + if c.IsClosed() { return nil, ErrClosed } -- cgit 1.2.3-korg