summaryrefslogtreecommitdiffstats
path: root/vstf/vstf/rpc_frame_work/rpc_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'vstf/vstf/rpc_frame_work/rpc_producer.py')
-rw-r--r--vstf/vstf/rpc_frame_work/rpc_producer.py258
1 files changed, 0 insertions, 258 deletions
diff --git a/vstf/vstf/rpc_frame_work/rpc_producer.py b/vstf/vstf/rpc_frame_work/rpc_producer.py
deleted file mode 100644
index abf2a7fc..00000000
--- a/vstf/vstf/rpc_frame_work/rpc_producer.py
+++ /dev/null
@@ -1,258 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import uuid
-import json
-import time
-import exceptions
-import logging
-
-import pika
-from vstf.common import message
-from vstf.common import excepts
-from vstf.rpc_frame_work import constant
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcProxy(object):
- def __init__(self, host,
- user='guest',
- passwd='guest',
- port='5672'):
- """create a connection to rabbitmq,direct call and fan call supported.
-
- """
- # try to create connection of rabbitmq
- self._channel = None
- self._connection = None
- self._queue = str(uuid.uuid4())
- self._consume_tag = None
-
- self.user = user
- self.passwd = passwd
- self.srv = host
- self.port = port
- self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
- try:
- self.connect(host, self.setup_vstf_producer)
- except Exception as e:
- LOG.error("create connection failed. e:%(e)s", {'e': e})
- raise e
-
- self.response = None
- self.corr_id = None
-
- def connect(self, host, ok_callback):
- """Create a Blocking connection to the rabbitmq-server
-
- :param str host: the rabbitmq-server's host
- :param obj ok_callback: if connect success than do this function
-
- """
- LOG.info("Connect to the server %s", host)
- self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
- if self._connection:
- ok_callback()
-
- def setup_vstf_producer(self):
- self.open_channel()
- self.create_exchange(constant.exchange_d, constant.DIRECT)
- self.bind_queues()
- self.start_consumer()
-
- def open_channel(self):
- self._channel = self._connection.channel()
- if self._channel:
- self._channel.confirm_delivery()
-
- def create_exchange(self, name, type):
- LOG.info("Create %s exchange: %s", type, name)
- self._channel.exchange_declare(exchange=name, type=type)
-
- def bind_queues(self):
- LOG.info("Declare queue %s and bind it to exchange %s",
- self._queue, constant.exchange_d)
- self._channel.queue_declare(queue=self._queue, exclusive=True)
- self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
-
- def start_consumer(self):
- LOG.info("Start response consumer")
- self._consume_tag = self._channel.basic_consume(self.on_response,
- no_ack=True,
- queue=self._queue)
-
- def stop_consuming(self):
- """Tell RabbitMQ that you would like to stop consuming by sending the
- Basic.Cancel RPC command.
-
- """
- if self._channel:
- LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
- self._channel.basic_cancel(self._consume_tag)
-
- self.close_channel()
-
- def close_channel(self):
- """Call to close the channel with RabbitMQ cleanly by issuing the
- Channel.Close RPC command.
-
- """
- LOG.info('Closing the channel')
- self._channel.close()
-
- def close_connection(self):
- """This method closes the connection to RabbitMQ."""
- LOG.info('Closing connection')
- self._connection.close()
-
- def stop(self):
- self.stop_consuming()
- self.close_connection()
-
- def on_response(self, ch, method, props, body):
- """this func reciver the msg"""
- self.response = None
- if self.corr_id == props.correlation_id:
- self.response = json.loads(body)
- LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
- {'h': self.response.get('head'), 'b': self.response.get('body')})
- else:
- LOG.warn("Proxy producer Drop the msg "
- "because of the wrong correlation id, %s\n" % body)
-
- def publish(self, target, corrid, body):
- properties = pika.BasicProperties(reply_to=self._queue,
- correlation_id=corrid)
- LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
- , constant.exchange_d, target, body)
- return self._channel.basic_publish(exchange=constant.exchange_d,
- routing_key=target,
- mandatory=True,
- properties=properties,
- body=message.encode(body))
-
- def call(self, msg, target='agent', timeout=constant.TIMEOUT):
- """send msg to agent by id, this func will wait ack until timeout
- :msg the msg to be sent
- :id agent's id
- :timeout timeout of waiting response
-
- """
- self.response = None
- queue = constant.queue_common + target
- # the msg request and respone must be match by corr_id
- self.corr_id = str(uuid.uuid4())
- # same msg format
- msg = message.add_context(msg, corrid=self.corr_id)
-
- # send msg to the queue
- try:
- ret = self.publish(queue, self.corr_id, msg)
- except Exception as e:
- LOG.error(message.dumpstrace())
- raise excepts.ChannelDie
-
- # if delivery msg failed. return error
- # clean the msg in the queue
- if not ret:
- LOG.error("productor message delivery failed.")
- return "Message can not be deliveryed, please check the connection of agent."
-
- # wait for response
- t_begin = time.time()
- while self.response is None:
- self._connection.process_data_events()
- count = time.time() - t_begin
- if count > timeout:
- LOG.error("Command timeout!")
- # flush the msg of the queue
- self._channel.queue_purge(queue=queue)
- # self.channel.basic_cancel()
- return False
-
- msg_body = message.get_body(message.decode(self.response))
-
- # deal with exceptions
- if msg_body \
- and isinstance(msg_body, dict) \
- and msg_body.has_key('exception'):
- ename = str(msg_body['exception'].get('name'))
- if hasattr(exceptions, ename):
- e = getattr(exceptions, ename)()
- else:
- class CallError(Exception):
- pass
-
- e = CallError()
- e.message = str(msg_body['exception'].get('message'))
- e.args = msg_body['exception'].get('args')
- raise e
- else:
- return msg_body
-
-
-class Server(object):
- def __init__(self, host=None,
- user='guest',
- passwd='guest',
- port='5672'):
- super(Server, self).__init__()
- # Default use salt's master ip as rabbit rpc server ip
- if host is None:
- raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
-
- self.host = host
- self.port = port
- self.user = user
- self.passwd = passwd
- try:
- self.proxy = RpcProxy(host=host,
- port=port,
- user=user,
- passwd=passwd)
- except Exception as e:
- raise e
-
- def call(self, msg, msg_id, timeout=constant.TIMEOUT):
- """when you add a server listen to the rabbit
- you must appoint which queue to be listen.
- :@queue the queue name.
- """
- retry = False
-
- try:
- ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
- except excepts.ChannelDie:
- # this may be the proxy die, try to reconnect to the rabbit
- del self.proxy
- self.proxy = RpcProxy(host=self.host,
- port=self.port,
- user=self.user,
- passwd=self.passwd)
- if self.proxy is None:
- raise excepts.UnsolvableExit
- retry = True
-
- if retry:
- # if retry happened except, throw to uplay
- ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
-
- return ret
-
- def cast(self, msg):
- """when you want to send msg to all agent and no reply, use this func"""
- LOG.warn("cast not support now.")
-
- def make_msg(self, method, **kargs):
- return {'method': method,
- 'args': kargs}
-
- def close(self):
- self.proxy.stop()