aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/vendor/github.com/streadway/amqp/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/vendor/github.com/streadway/amqp/connection.go')
-rw-r--r--src/dma/vendor/github.com/streadway/amqp/connection.go43
1 files changed, 24 insertions, 19 deletions
diff --git a/src/dma/vendor/github.com/streadway/amqp/connection.go b/src/dma/vendor/github.com/streadway/amqp/connection.go
index ca1372d0..b9d8e8ee 100644
--- a/src/dma/vendor/github.com/streadway/amqp/connection.go
+++ b/src/dma/vendor/github.com/streadway/amqp/connection.go
@@ -111,21 +111,23 @@ type readDeadliner interface {
SetReadDeadline(time.Time) error
}
-// defaultDial establishes a connection when config.Dial is not provided
-func defaultDial(network, addr string) (net.Conn, error) {
- conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
- if err != nil {
- return nil, err
- }
+// DefaultDial establishes a connection when config.Dial is not provided
+func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
+ return func(network, addr string) (net.Conn, error) {
+ conn, err := net.DialTimeout(network, addr, connectionTimeout)
+ if err != nil {
+ return nil, err
+ }
- // Heartbeating hasn't started yet, don't stall forever on a dead server.
- // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
- // the deadline is cleared in openComplete.
- if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
- return nil, err
- }
+ // Heartbeating hasn't started yet, don't stall forever on a dead server.
+ // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
+ // the deadline is cleared in openComplete.
+ if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
+ return nil, err
+ }
- return conn, nil
+ return conn, nil
+ }
}
// Dial accepts a string in the AMQP URI format and returns a new Connection
@@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
dialer := config.Dial
if dialer == nil {
- dialer = defaultDial
+ dialer = DefaultDial(defaultConnectionTimeout)
}
conn, err = dialer("tcp", addr)
@@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
client := tls.Client(conn, config.TLSClientConfig)
if err := client.Handshake(); err != nil {
+
conn.Close()
return nil, err
}
@@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers
will also be closed.
*/
func (c *Connection) Close() error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -332,7 +335,7 @@ func (c *Connection) Close() error {
}
func (c *Connection) closeWith(err *Error) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error {
)
}
-func (c *Connection) isClosed() bool {
+// IsClosed returns true if the connection is marked as closed, otherwise false
+// is returned.
+func (c *Connection) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
func (c *Connection) send(f frame) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
@@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) {
c.m.Lock()
defer c.m.Unlock()
- if c.isClosed() {
+ if c.IsClosed() {
return nil, ErrClosed
}