summaryrefslogtreecommitdiffstats
path: root/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py')
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py34
1 files changed, 22 insertions, 12 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
index abf2a7fc..cb72b45d 100644
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
+++ b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
@@ -22,6 +22,7 @@ LOG = logging.getLogger(__name__)
class RpcProxy(object):
+
def __init__(self, host,
user='guest',
passwd='guest',
@@ -39,7 +40,8 @@ class RpcProxy(object):
self.passwd = passwd
self.srv = host
self.port = port
- self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+ self.url = 'amqp://' + self.user + ':' + self.passwd + \
+ '@' + self.srv + ':' + self.port + '/%2F'
try:
self.connect(host, self.setup_vstf_producer)
except Exception as e:
@@ -51,13 +53,14 @@ class RpcProxy(object):
def connect(self, host, ok_callback):
"""Create a Blocking connection to the rabbitmq-server
-
+
:param str host: the rabbitmq-server's host
:param obj ok_callback: if connect success than do this function
-
+
"""
LOG.info("Connect to the server %s", host)
- self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
+ self._connection = pika.BlockingConnection(
+ pika.URLParameters(self.url))
if self._connection:
ok_callback()
@@ -80,7 +83,9 @@ class RpcProxy(object):
LOG.info("Declare queue %s and bind it to exchange %s",
self._queue, constant.exchange_d)
self._channel.queue_declare(queue=self._queue, exclusive=True)
- self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
+ self._channel.queue_bind(
+ exchange=constant.exchange_d,
+ queue=self._queue)
def start_consumer(self):
LOG.info("Start response consumer")
@@ -121,8 +126,8 @@ class RpcProxy(object):
self.response = None
if self.corr_id == props.correlation_id:
self.response = json.loads(body)
- LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
- {'h': self.response.get('head'), 'b': self.response.get('body')})
+ LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", {
+ 'h': self.response.get('head'), 'b': self.response.get('body')})
else:
LOG.warn("Proxy producer Drop the msg "
"because of the wrong correlation id, %s\n" % body)
@@ -130,8 +135,11 @@ class RpcProxy(object):
def publish(self, target, corrid, body):
properties = pika.BasicProperties(reply_to=self._queue,
correlation_id=corrid)
- LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
- , constant.exchange_d, target, body)
+ LOG.debug(
+ "start to publish message to the exchange=%s, target=%s, msg=%s",
+ constant.exchange_d,
+ target,
+ body)
return self._channel.basic_publish(exchange=constant.exchange_d,
routing_key=target,
mandatory=True,
@@ -149,7 +157,7 @@ class RpcProxy(object):
queue = constant.queue_common + target
# the msg request and respone must be match by corr_id
self.corr_id = str(uuid.uuid4())
- # same msg format
+ # same msg format
msg = message.add_context(msg, corrid=self.corr_id)
# send msg to the queue
@@ -182,7 +190,7 @@ class RpcProxy(object):
# deal with exceptions
if msg_body \
and isinstance(msg_body, dict) \
- and msg_body.has_key('exception'):
+ and 'exception' in msg_body:
ename = str(msg_body['exception'].get('name'))
if hasattr(exceptions, ename):
e = getattr(exceptions, ename)()
@@ -199,6 +207,7 @@ class RpcProxy(object):
class Server(object):
+
def __init__(self, host=None,
user='guest',
passwd='guest',
@@ -206,7 +215,8 @@ class Server(object):
super(Server, self).__init__()
# Default use salt's master ip as rabbit rpc server ip
if host is None:
- raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
+ raise Exception(
+ "Can not create rpc proxy because of the None rabbitmq server address.")
self.host = host
self.port = port