diff options
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py')
-rw-r--r-- | testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py index abf2a7fc..cb72b45d 100644 --- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py +++ b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py @@ -22,6 +22,7 @@ LOG = logging.getLogger(__name__) class RpcProxy(object): + def __init__(self, host, user='guest', passwd='guest', @@ -39,7 +40,8 @@ class RpcProxy(object): self.passwd = passwd self.srv = host self.port = port - self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F' + self.url = 'amqp://' + self.user + ':' + self.passwd + \ + '@' + self.srv + ':' + self.port + '/%2F' try: self.connect(host, self.setup_vstf_producer) except Exception as e: @@ -51,13 +53,14 @@ class RpcProxy(object): def connect(self, host, ok_callback): """Create a Blocking connection to the rabbitmq-server - + :param str host: the rabbitmq-server's host :param obj ok_callback: if connect success than do this function - + """ LOG.info("Connect to the server %s", host) - self._connection = pika.BlockingConnection(pika.URLParameters(self.url)) + self._connection = pika.BlockingConnection( + pika.URLParameters(self.url)) if self._connection: ok_callback() @@ -80,7 +83,9 @@ class RpcProxy(object): LOG.info("Declare queue %s and bind it to exchange %s", self._queue, constant.exchange_d) self._channel.queue_declare(queue=self._queue, exclusive=True) - self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue) + self._channel.queue_bind( + exchange=constant.exchange_d, + queue=self._queue) def start_consumer(self): LOG.info("Start response consumer") @@ -121,8 +126,8 @@ class RpcProxy(object): self.response = None if self.corr_id == props.correlation_id: self.response = json.loads(body) - LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", - {'h': self.response.get('head'), 'b': self.response.get('body')}) + LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", { + 'h': self.response.get('head'), 'b': self.response.get('body')}) else: LOG.warn("Proxy producer Drop the msg " "because of the wrong correlation id, %s\n" % body) @@ -130,8 +135,11 @@ class RpcProxy(object): def publish(self, target, corrid, body): properties = pika.BasicProperties(reply_to=self._queue, correlation_id=corrid) - LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s" - , constant.exchange_d, target, body) + LOG.debug( + "start to publish message to the exchange=%s, target=%s, msg=%s", + constant.exchange_d, + target, + body) return self._channel.basic_publish(exchange=constant.exchange_d, routing_key=target, mandatory=True, @@ -149,7 +157,7 @@ class RpcProxy(object): queue = constant.queue_common + target # the msg request and respone must be match by corr_id self.corr_id = str(uuid.uuid4()) - # same msg format + # same msg format msg = message.add_context(msg, corrid=self.corr_id) # send msg to the queue @@ -182,7 +190,7 @@ class RpcProxy(object): # deal with exceptions if msg_body \ and isinstance(msg_body, dict) \ - and msg_body.has_key('exception'): + and 'exception' in msg_body: ename = str(msg_body['exception'].get('name')) if hasattr(exceptions, ename): e = getattr(exceptions, ename)() @@ -199,6 +207,7 @@ class RpcProxy(object): class Server(object): + def __init__(self, host=None, user='guest', passwd='guest', @@ -206,7 +215,8 @@ class Server(object): super(Server, self).__init__() # Default use salt's master ip as rabbit rpc server ip if host is None: - raise Exception("Can not create rpc proxy because of the None rabbitmq server address.") + raise Exception( + "Can not create rpc proxy because of the None rabbitmq server address.") self.host = host self.port = port |