summaryrefslogtreecommitdiffstats
path: root/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py')
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py31
1 files changed, 19 insertions, 12 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
index fb54e5db..049b86fa 100644
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
+++ b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
@@ -57,7 +57,8 @@ class VstfConsumer(object):
self.srv = host
self.port = port
self.agent_id = agent_id
- self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+ self.url = 'amqp://' + self.user + ':' + self.passwd + \
+ '@' + self.srv + ':' + self.port + '/%2F'
# load the agent_funcs
try:
@@ -122,8 +123,10 @@ class VstfConsumer(object):
if self._closing:
self._connection.ioloop.stop()
else:
- LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
- reply_code, reply_text)
+ LOGGER.warning(
+ 'Connection closed, reopening in 2 seconds: (%s) %s',
+ reply_code,
+ reply_text)
self._connection.add_timeout(2, self.reconnect)
def reconnect(self):
@@ -206,7 +209,10 @@ class VstfConsumer(object):
:param str|unicode exchange_name: The name of the exchange to declare
"""
- LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
+ LOGGER.info(
+ 'Declaring %s exchange %s',
+ constant.DIRECT,
+ constant.exchange_d)
self._channel.exchange_declare(self.on_direct_exchange_declareok,
constant.exchange_d,
constant.DIRECT)
@@ -342,14 +348,15 @@ class VstfConsumer(object):
'args': e.args}}
finally:
response = message.add_context(response, **head)
- LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
- {'h': response.get('head'), 'b': response.get('body')})
-
- respone_chanl.basic_publish(exchange=constant.exchange_d,
- routing_key=properties.reply_to,
- properties=pika.BasicProperties(correlation_id=properties.correlation_id),
- body=message.encode(response)
- )
+ LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", {
+ 'h': response.get('head'), 'b': response.get('body')})
+
+ respone_chanl.basic_publish(
+ exchange=constant.exchange_d,
+ routing_key=properties.reply_to,
+ properties=pika.BasicProperties(
+ correlation_id=properties.correlation_id),
+ body=message.encode(response))
# no matter what happend, tell the mq-server to drop this msg.
self.acknowledge_message(basic_deliver.delivery_tag)