diff options
Diffstat (limited to 'vstf/vstf/rpc_frame_work')
-rwxr-xr-x | vstf/vstf/rpc_frame_work/__init__.py | 15 | ||||
-rwxr-xr-x | vstf/vstf/rpc_frame_work/constant.py | 13 | ||||
-rwxr-xr-x | vstf/vstf/rpc_frame_work/rpc_consumer.py | 421 | ||||
-rwxr-xr-x | vstf/vstf/rpc_frame_work/rpc_producer.py | 251 |
4 files changed, 700 insertions, 0 deletions
diff --git a/vstf/vstf/rpc_frame_work/__init__.py b/vstf/vstf/rpc_frame_work/__init__.py new file mode 100755 index 00000000..4dc8a6aa --- /dev/null +++ b/vstf/vstf/rpc_frame_work/__init__.py @@ -0,0 +1,15 @@ +# Copyright Huawei Technologies Co., Ltd. 1998-2015. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the License); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + diff --git a/vstf/vstf/rpc_frame_work/constant.py b/vstf/vstf/rpc_frame_work/constant.py new file mode 100755 index 00000000..da555dee --- /dev/null +++ b/vstf/vstf/rpc_frame_work/constant.py @@ -0,0 +1,13 @@ +fan_exchange = "esp_exchange" +exchange_d = "vstf_dexch" +queue_common = "vstf-" +TIMEOUT = 10 +NEVERTIMEOUT = 9999 +FAN = "fanout" +DIRECT = "direct" + +# these for commandline + +NETDEV = "netdevs" +LISTDEVS = "lspci | grep Eth" +LIST_DEVS_NAME = "ls /sys/class/net/" diff --git a/vstf/vstf/rpc_frame_work/rpc_consumer.py b/vstf/vstf/rpc_frame_work/rpc_consumer.py new file mode 100755 index 00000000..f7aacfd6 --- /dev/null +++ b/vstf/vstf/rpc_frame_work/rpc_consumer.py @@ -0,0 +1,421 @@ +#!/usr/bin/env python +# coding=utf-8 +import logging +import time + +import stevedore + +import pika +from vstf.common import message +from vstf.rpc_frame_work import constant + +LOGGER = logging.getLogger(__name__) + + +class VstfConsumer(object): + """This is an example consumer that will handle unexpected interactions + with RabbitMQ such as channel and connection closures. + + If RabbitMQ closes the connection, it will reopen it. You should + look at the output, as there are limited reasons why the connection may + be closed, which usually are tied to permission related issues or + socket timeouts. + + If the channel is closed, it will indicate a problem with one of the + commands that were issued and that should surface in the output as well. + + """ + + def __init__(self, agent, + user='guest', + passwd='guest', + host='localhost', + port='5672', + agent_id="agent"): + """Create a new instance of the consumer class, passing in the AMQP + URL used to connect to RabbitMQ. + + :param str user: The user name of RabbitMQ server + :param str passwd: The passwd of RabbitMQ server + :param str host: The ip of RabbitMQ server + :param str port: connection's port + + """ + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag = None + self.user = user + self.passwd = passwd + self.srv = host + self.port = port + self.agent_id = agent_id + self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F' + + # load the agent_funcs + try: + self.agent_ops = stevedore.driver.DriverManager( + namespace="agent.plugins", + name=agent, + invoke_on_load=True) + except Exception as e: + LOGGER.error(message.dumpstrace()) + raise e + + super(VstfConsumer, self).__init__() + + def connect(self): + """This method connects to RabbitMQ, returning the connection handle. + When the connection is established, the on_connection_open method + will be invoked by pika. + + :rtype: pika.SelectConnection + + """ + LOGGER.info('Connecting to %s:%s', self.srv, self.port) + return pika.SelectConnection(pika.URLParameters(self.url), + self.on_connection_open, + stop_ioloop_on_close=False) + + # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)), + # self.on_connection_open, + # stop_ioloop_on_close=False) + + def on_connection_open(self, unused_connection): + """This method is called by pika once the connection to RabbitMQ has + been established. It passes the handle to the connection object in + case we need it, but in this case, we'll just mark it unused. + + :type unused_connection: pika.SelectConnection + + """ + LOGGER.info('Connection opened') + self.add_on_connection_close_callback() + self.open_channel() + + def add_on_connection_close_callback(self): + """This method adds an on close callback that will be invoked by pika + when RabbitMQ closes the connection to the publisher unexpectedly. + + """ + LOGGER.info('Adding connection close callback') + self._connection.add_on_close_callback(self.on_connection_closed) + + def on_connection_closed(self, connection, reply_code, reply_text): + """This method is invoked by pika when the connection to RabbitMQ is + closed unexpectedly. Since it is unexpected, we will reconnect to + RabbitMQ if it disconnects. + + :param pika.connection.Connection connection: The closed connection obj + :param int reply_code: The server provided reply_code if given + :param str reply_text: The server provided reply_text if given + + """ + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s', + reply_code, reply_text) + self._connection.add_timeout(2, self.reconnect) + + def reconnect(self): + """Will be invoked by the IOLoop timer if the connection is + closed. See the on_connection_closed method. + + """ + # This is the old connection IOLoop instance, stop its ioloop + # Sometimes the broken connection may be exception + try: + self._connection.ioloop.stop() + except Exception: + pass + + while not self._closing: + # Create a new connection + try: + self._connection = self.connect() + except Exception: + LOGGER.error(message.dumpstrace()) + time.sleep(3) + continue + break + + # There is now a new connection, needs a new ioloop to run + self._connection.ioloop.start() + + def open_channel(self): + """Open a new channel with RabbitMQ by issuing the Channel.Open RPC + command. When RabbitMQ responds that the channel is open, the + on_channel_open callback will be invoked by pika. + + """ + LOGGER.info('Creating a new channel') + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel): + """This method is invoked by pika when the channel has been opened. + The channel object is passed in so we can make use of it. + + Since the channel is now open, we'll declare the exchange to use. + + :param pika.channel.Channel channel: The channel object + + """ + LOGGER.info('Channel opened') + self._channel = channel + self.add_on_channel_close_callback() + self.setup_exchanges() + + def add_on_channel_close_callback(self): + """This method tells pika to call the on_channel_closed method if + RabbitMQ unexpectedly closes the channel. + + """ + LOGGER.info('Adding channel close callback') + self._channel.add_on_close_callback(self.on_channel_closed) + + def on_channel_closed(self, channel, reply_code, reply_text): + """Invoked by pika when RabbitMQ unexpectedly closes the channel. + Channels are usually closed if you attempt to do something that + violates the protocol, such as re-declare an exchange or queue with + different parameters. In this case, we'll close the connection + to shutdown the object. + + :param pika.channel.Channel: The closed channel + :param int reply_code: The numeric reason the channel was closed + :param str reply_text: The text reason the channel was closed + + """ + LOGGER.warning('Channel %i was closed: (%s) %s', + channel, reply_code, reply_text) + self._connection.close() + + def setup_exchanges(self): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d) + self._channel.exchange_declare(self.on_direct_exchange_declareok, + constant.exchange_d, + constant.DIRECT) + + # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange) + # self._channel.exchange_declare(self.on_fan_exchange_declareok, + # constant.fan_exchange, + # constant.FAN) + + def on_fan_exchange_declareok(self, unused_frame): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame + + """ + LOGGER.info('Exchange declared') + pass + + def on_direct_exchange_declareok(self, unused_frame): + queue_name = constant.queue_common + self.agent_id + self.setup_queue(queue_name, self.on_direct_queue_declareok) + + def setup_queue(self, queue_name, next_ops): + """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC + command. When it is complete, the on_queue_declareok method will + be invoked by pika. + + :param str|unicode queue_name: The name of the queue to declare. + + """ + LOGGER.info('Declaring queue %s', queue_name) + self._channel.queue_declare(next_ops, + queue=queue_name, + exclusive=True) + + def on_direct_queue_declareok(self, method_frame): + """Method invoked by pika when the Queue.Declare RPC call made in + setup_queue has completed. In this method we will bind the queue + and exchange together with the routing key by issuing the Queue.Bind + RPC command. When this command is complete, the on_bindok method will + be invoked by pika. + + :param pika.frame.Method method_frame: The Queue.DeclareOk frame + + """ + queue_name = constant.queue_common + self.agent_id + LOGGER.info('Binding %s to %s with %s', + queue_name, constant.exchange_d, queue_name) + self._channel.queue_bind(self.on_bindok, queue_name, + constant.exchange_d, queue_name) + + def on_bindok(self, unused_frame): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will start consuming messages by calling start_consuming + which will invoke the needed RPC commands to start the process. + + :param pika.frame.Method unused_frame: The Queue.BindOk response frame + + """ + LOGGER.info('Queue bound') + self.start_consuming() + + def start_consuming(self): + """This method sets up the consumer by first calling + add_on_cancel_callback so that the object is notified if RabbitMQ + cancels the consumer. It then issues the Basic.Consume RPC command + which returns the consumer tag that is used to uniquely identify the + consumer with RabbitMQ. We keep the value to use it when we want to + cancel consuming. The on_message method is passed in as a callback pika + will invoke when a message is fully received. + + """ + queue_name = constant.queue_common + self.agent_id + LOGGER.info('Issuing consumer related RPC commands') + self.add_on_cancel_callback() + self._consumer_tag = self._channel.basic_consume(self.on_message, + queue_name) + + def add_on_cancel_callback(self): + """Add a callback that will be invoked if RabbitMQ cancels the consumer + for some reason. If RabbitMQ does cancel the consumer, + on_consumer_cancelled will be invoked by pika. + + """ + LOGGER.info('Adding consumer cancellation callback') + self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + + def on_consumer_cancelled(self, method_frame): + """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer + receiving messages. + + :param pika.frame.Method method_frame: The Basic.Cancel frame + + """ + LOGGER.info('Consumer was cancelled remotely, shutting down: %r', + method_frame) + if self._channel: + self._channel.close() + + def on_message(self, respone_chanl, basic_deliver, properties, body): + """Invoked by pika when a message is delivered from RabbitMQ. The + channel is passed for your convenience. The basic_deliver object that + is passed in carries the exchange, routing key, delivery tag and + a redelivered flag for the message. The properties passed in is an + instance of BasicProperties with the message properties and the body + is the message that was sent. + + :param pika.channel.Channel unused_channel: The channel object + :param pika.Spec.Basic.Deliver: basic_deliver method + :param pika.Spec.BasicProperties: properties + :param str|unicode body: The message body + + """ + LOGGER.info('Received message # %s from %s: %s', + basic_deliver.delivery_tag, properties.app_id, body) + try: + msg = message.decode(body) + head = message.get_context(msg) + main_body = message.get_body(msg) + + LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s", + {'h': head, + 'b': main_body}) + + func = getattr(self.agent_ops.driver, main_body.get('method')) + response = func(**main_body.get('args')) + except Exception as e: + LOGGER.error(message.dumpstrace()) + LOGGER.error("request happend error") + response = {'exception': {'name': e.__class__.__name__, + 'message': e.message, + 'args': e.args}} + finally: + response = message.add_context(response, **head) + LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", + {'h': response.get('head'), 'b': response.get('body')}) + + respone_chanl.basic_publish(exchange=constant.exchange_d, + routing_key=properties.reply_to, + properties=pika.BasicProperties(correlation_id=properties.correlation_id), + body=message.encode(response) + ) + # no matter what happend, tell the mq-server to drop this msg. + + self.acknowledge_message(basic_deliver.delivery_tag) + + def acknowledge_message(self, delivery_tag): + """Acknowledge the message delivery from RabbitMQ by sending a + Basic.Ack RPC method for the delivery tag. + + :param int delivery_tag: The delivery tag from the Basic.Deliver frame + + """ + LOGGER.info('Acknowledging message %s', delivery_tag) + self._channel.basic_ack(delivery_tag) + + def stop_consuming(self): + """Tell RabbitMQ that you would like to stop consuming by sending the + Basic.Cancel RPC command. + + """ + if self._channel: + LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') + self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + + def on_cancelok(self, unused_frame): + """This method is invoked by pika when RabbitMQ acknowledges the + cancellation of a consumer. At this point we will close the channel. + This will invoke the on_channel_closed method once the channel has been + closed, which will in-turn close the connection. + + :param pika.frame.Method unused_frame: The Basic.CancelOk frame + + """ + LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer') + self.close_channel() + + def close_channel(self): + """Call to close the channel with RabbitMQ cleanly by issuing the + Channel.Close RPC command. + + """ + LOGGER.info('Closing the channel') + self._channel.close() + + def run(self): + """Run the example consumer by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + """ + try: + self._connection = self.connect() + except Exception as e: + LOGGER.error(message.dumpstrace()) + self._connection.ioloop.start() + + def stop(self): + """Cleanly shutdown the connection to RabbitMQ by stopping the consumer + with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok + will be invoked by pika, which will then closing the channel and + connection. The IOLoop is started again because this method is invoked + when CTRL-C is pressed raising a KeyboardInterrupt exception. This + exception stops the IOLoop which needs to be running for pika to + communicate with RabbitMQ. All of the commands issued prior to starting + the IOLoop will be buffered but not processed. + + """ + LOGGER.info('Stopping') + self._closing = True + self.stop_consuming() + self._connection.ioloop.stop() + self.close_connection() + LOGGER.info('Stopped') + + def close_connection(self): + """This method closes the connection to RabbitMQ.""" + LOGGER.info('Closing connection') + self._connection.close() diff --git a/vstf/vstf/rpc_frame_work/rpc_producer.py b/vstf/vstf/rpc_frame_work/rpc_producer.py new file mode 100755 index 00000000..c56c9e5e --- /dev/null +++ b/vstf/vstf/rpc_frame_work/rpc_producer.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python +# coding=utf-8 +import uuid +import json +import time +import exceptions +import logging + +import pika +from vstf.common import message +from vstf.common import excepts +from vstf.rpc_frame_work import constant + +LOG = logging.getLogger(__name__) + + +class RpcProxy(object): + def __init__(self, host, + user='guest', + passwd='guest', + port='5672'): + """create a connection to rabbitmq,direct call and fan call supported. + + """ + # try to create connection of rabbitmq + self._channel = None + self._connection = None + self._queue = str(uuid.uuid4()) + self._consume_tag = None + + self.user = user + self.passwd = passwd + self.srv = host + self.port = port + self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F' + try: + self.connect(host, self.setup_vstf_producer) + except Exception as e: + LOG.error("create connection failed. e:%(e)s", {'e': e}) + raise e + + self.response = None + self.corr_id = None + + def connect(self, host, ok_callback): + """Create a Blocking connection to the rabbitmq-server + + :param str host: the rabbitmq-server's host + :param obj ok_callback: if connect success than do this function + + """ + LOG.info("Connect to the server %s", host) + self._connection = pika.BlockingConnection(pika.URLParameters(self.url)) + if self._connection: + ok_callback() + + def setup_vstf_producer(self): + self.open_channel() + self.create_exchange(constant.exchange_d, constant.DIRECT) + self.bind_queues() + self.start_consumer() + + def open_channel(self): + self._channel = self._connection.channel() + if self._channel: + self._channel.confirm_delivery() + + def create_exchange(self, name, type): + LOG.info("Create %s exchange: %s", type, name) + self._channel.exchange_declare(exchange=name, type=type) + + def bind_queues(self): + LOG.info("Declare queue %s and bind it to exchange %s", + self._queue, constant.exchange_d) + self._channel.queue_declare(queue=self._queue, exclusive=True) + self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue) + + def start_consumer(self): + LOG.info("Start response consumer") + self._consume_tag = self._channel.basic_consume(self.on_response, + no_ack=True, + queue=self._queue) + + def stop_consuming(self): + """Tell RabbitMQ that you would like to stop consuming by sending the + Basic.Cancel RPC command. + + """ + if self._channel: + LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ') + self._channel.basic_cancel(self._consume_tag) + + self.close_channel() + + def close_channel(self): + """Call to close the channel with RabbitMQ cleanly by issuing the + Channel.Close RPC command. + + """ + LOG.info('Closing the channel') + self._channel.close() + + def close_connection(self): + """This method closes the connection to RabbitMQ.""" + LOG.info('Closing connection') + self._connection.close() + + def stop(self): + self.stop_consuming() + self.close_connection() + + def on_response(self, ch, method, props, body): + """this func reciver the msg""" + self.response = None + if self.corr_id == props.correlation_id: + self.response = json.loads(body) + LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", + {'h': self.response.get('head'), 'b': self.response.get('body')}) + else: + LOG.warn("Proxy producer Drop the msg " + "because of the wrong correlation id, %s\n" % body) + + def publish(self, target, corrid, body): + properties = pika.BasicProperties(reply_to=self._queue, + correlation_id=corrid) + LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s" + , constant.exchange_d, target, body) + return self._channel.basic_publish(exchange=constant.exchange_d, + routing_key=target, + mandatory=True, + properties=properties, + body=message.encode(body)) + + def call(self, msg, target='agent', timeout=constant.TIMEOUT): + """send msg to agent by id, this func will wait ack until timeout + :msg the msg to be sent + :id agent's id + :timeout timeout of waiting response + + """ + self.response = None + queue = constant.queue_common + target + # the msg request and respone must be match by corr_id + self.corr_id = str(uuid.uuid4()) + # same msg format + msg = message.add_context(msg, corrid=self.corr_id) + + # send msg to the queue + try: + ret = self.publish(queue, self.corr_id, msg) + except Exception as e: + LOG.error(message.dumpstrace()) + raise excepts.ChannelDie + + # if delivery msg failed. return error + # clean the msg in the queue + if not ret: + LOG.error("productor message delivery failed.") + return "Message can not be deliveryed, please check the connection of agent." + + # wait for response + t_begin = time.time() + while self.response is None: + self._connection.process_data_events() + count = time.time() - t_begin + if count > timeout: + LOG.error("Command timeout!") + # flush the msg of the queue + self._channel.queue_purge(queue=queue) + # self.channel.basic_cancel() + return False + + msg_body = message.get_body(message.decode(self.response)) + + # deal with exceptions + if msg_body \ + and isinstance(msg_body, dict) \ + and msg_body.has_key('exception'): + ename = str(msg_body['exception'].get('name')) + if hasattr(exceptions, ename): + e = getattr(exceptions, ename)() + else: + class CallError(Exception): + pass + + e = CallError() + e.message = str(msg_body['exception'].get('message')) + e.args = msg_body['exception'].get('args') + raise e + else: + return msg_body + + +class Server(object): + def __init__(self, host=None, + user='guest', + passwd='guest', + port='5672'): + super(Server, self).__init__() + # Default use salt's master ip as rabbit rpc server ip + if host is None: + raise Exception("Can not create rpc proxy because of the None rabbitmq server address.") + + self.host = host + self.port = port + self.user = user + self.passwd = passwd + try: + self.proxy = RpcProxy(host=host, + port=port, + user=user, + passwd=passwd) + except Exception as e: + raise e + + def call(self, msg, msg_id, timeout=constant.TIMEOUT): + """when you add a server listen to the rabbit + you must appoint which queue to be listen. + :@queue the queue name. + """ + retry = False + + try: + ret = self.proxy.call(msg, target=msg_id, timeout=timeout) + except excepts.ChannelDie: + # this may be the proxy die, try to reconnect to the rabbit + del self.proxy + self.proxy = RpcProxy(host=self.host, + port=self.port, + user=self.user, + passwd=self.passwd) + if self.proxy is None: + raise excepts.UnsolvableExit + retry = True + + if retry: + # if retry happened except, throw to uplay + ret = self.proxy.call(msg, target=msg_id, timeout=timeout) + + return ret + + def cast(self, msg): + """when you want to send msg to all agent and no reply, use this func""" + LOG.warn("cast not support now.") + + def make_msg(self, method, **kargs): + return {'method': method, + 'args': kargs} + + def close(self): + self.proxy.stop() |