diff options
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/rpc_frame_work')
4 files changed, 0 insertions, 733 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py deleted file mode 100644 index 83b8d15d..00000000 --- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py deleted file mode 100644 index 84d0592a..00000000 --- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py +++ /dev/null @@ -1,22 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -fan_exchange = "esp_exchange" -exchange_d = "vstf_dexch" -queue_common = "vstf-" -TIMEOUT = 10 -NEVERTIMEOUT = 9999 -FAN = "fanout" -DIRECT = "direct" - -# these for commandline - -NETDEV = "netdevs" -LISTDEVS = "lspci | grep Eth" -LIST_DEVS_NAME = "ls /sys/class/net/" diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py deleted file mode 100644 index 049b86fa..00000000 --- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py +++ /dev/null @@ -1,435 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -import logging -import time - -import stevedore - -import pika -from vstf.common import message -from vstf.rpc_frame_work import constant - -LOGGER = logging.getLogger(__name__) - - -class VstfConsumer(object): - """This is an example consumer that will handle unexpected interactions - with RabbitMQ such as channel and connection closures. - - If RabbitMQ closes the connection, it will reopen it. You should - look at the output, as there are limited reasons why the connection may - be closed, which usually are tied to permission related issues or - socket timeouts. - - If the channel is closed, it will indicate a problem with one of the - commands that were issued and that should surface in the output as well. - - """ - - def __init__(self, agent, - user='guest', - passwd='guest', - host='localhost', - port='5672', - agent_id="agent"): - """Create a new instance of the consumer class, passing in the AMQP - URL used to connect to RabbitMQ. - - :param str user: The user name of RabbitMQ server - :param str passwd: The passwd of RabbitMQ server - :param str host: The ip of RabbitMQ server - :param str port: connection's port - - """ - self._connection = None - self._channel = None - self._closing = False - self._consumer_tag = None - self.user = user - self.passwd = passwd - self.srv = host - self.port = port - self.agent_id = agent_id - self.url = 'amqp://' + self.user + ':' + self.passwd + \ - '@' + self.srv + ':' + self.port + '/%2F' - - # load the agent_funcs - try: - self.agent_ops = stevedore.driver.DriverManager( - namespace="agent.plugins", - name=agent, - invoke_on_load=True) - except Exception as e: - LOGGER.error(message.dumpstrace()) - raise e - - super(VstfConsumer, self).__init__() - - def connect(self): - """This method connects to RabbitMQ, returning the connection handle. - When the connection is established, the on_connection_open method - will be invoked by pika. - - :rtype: pika.SelectConnection - - """ - LOGGER.info('Connecting to %s:%s', self.srv, self.port) - return pika.SelectConnection(pika.URLParameters(self.url), - self.on_connection_open, - stop_ioloop_on_close=False) - - # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)), - # self.on_connection_open, - # stop_ioloop_on_close=False) - - def on_connection_open(self, unused_connection): - """This method is called by pika once the connection to RabbitMQ has - been established. It passes the handle to the connection object in - case we need it, but in this case, we'll just mark it unused. - - :type unused_connection: pika.SelectConnection - - """ - LOGGER.info('Connection opened') - self.add_on_connection_close_callback() - self.open_channel() - - def add_on_connection_close_callback(self): - """This method adds an on close callback that will be invoked by pika - when RabbitMQ closes the connection to the publisher unexpectedly. - - """ - LOGGER.info('Adding connection close callback') - self._connection.add_on_close_callback(self.on_connection_closed) - - def on_connection_closed(self, connection, reply_code, reply_text): - """This method is invoked by pika when the connection to RabbitMQ is - closed unexpectedly. Since it is unexpected, we will reconnect to - RabbitMQ if it disconnects. - - :param pika.connection.Connection connection: The closed connection obj - :param int reply_code: The server provided reply_code if given - :param str reply_text: The server provided reply_text if given - - """ - self._channel = None - if self._closing: - self._connection.ioloop.stop() - else: - LOGGER.warning( - 'Connection closed, reopening in 2 seconds: (%s) %s', - reply_code, - reply_text) - self._connection.add_timeout(2, self.reconnect) - - def reconnect(self): - """Will be invoked by the IOLoop timer if the connection is - closed. See the on_connection_closed method. - - """ - # This is the old connection IOLoop instance, stop its ioloop - # Sometimes the broken connection may be exception - try: - self._connection.ioloop.stop() - except Exception: - pass - - while not self._closing: - # Create a new connection - try: - self._connection = self.connect() - except Exception: - LOGGER.error(message.dumpstrace()) - time.sleep(3) - continue - break - - # There is now a new connection, needs a new ioloop to run - self._connection.ioloop.start() - - def open_channel(self): - """Open a new channel with RabbitMQ by issuing the Channel.Open RPC - command. When RabbitMQ responds that the channel is open, the - on_channel_open callback will be invoked by pika. - - """ - LOGGER.info('Creating a new channel') - self._connection.channel(on_open_callback=self.on_channel_open) - - def on_channel_open(self, channel): - """This method is invoked by pika when the channel has been opened. - The channel object is passed in so we can make use of it. - - Since the channel is now open, we'll declare the exchange to use. - - :param pika.channel.Channel channel: The channel object - - """ - LOGGER.info('Channel opened') - self._channel = channel - self.add_on_channel_close_callback() - self.setup_exchanges() - - def add_on_channel_close_callback(self): - """This method tells pika to call the on_channel_closed method if - RabbitMQ unexpectedly closes the channel. - - """ - LOGGER.info('Adding channel close callback') - self._channel.add_on_close_callback(self.on_channel_closed) - - def on_channel_closed(self, channel, reply_code, reply_text): - """Invoked by pika when RabbitMQ unexpectedly closes the channel. - Channels are usually closed if you attempt to do something that - violates the protocol, such as re-declare an exchange or queue with - different parameters. In this case, we'll close the connection - to shutdown the object. - - :param pika.channel.Channel: The closed channel - :param int reply_code: The numeric reason the channel was closed - :param str reply_text: The text reason the channel was closed - - """ - LOGGER.warning('Channel %i was closed: (%s) %s', - channel, reply_code, reply_text) - self._connection.close() - - def setup_exchanges(self): - """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC - command. When it is complete, the on_exchange_declareok method will - be invoked by pika. - - :param str|unicode exchange_name: The name of the exchange to declare - - """ - LOGGER.info( - 'Declaring %s exchange %s', - constant.DIRECT, - constant.exchange_d) - self._channel.exchange_declare(self.on_direct_exchange_declareok, - constant.exchange_d, - constant.DIRECT) - - # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange) - # self._channel.exchange_declare(self.on_fan_exchange_declareok, - # constant.fan_exchange, - # constant.FAN) - - def on_fan_exchange_declareok(self, unused_frame): - """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC - command. - - :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame - - """ - LOGGER.info('Exchange declared') - pass - - def on_direct_exchange_declareok(self, unused_frame): - queue_name = constant.queue_common + self.agent_id - self.setup_queue(queue_name, self.on_direct_queue_declareok) - - def setup_queue(self, queue_name, next_ops): - """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC - command. When it is complete, the on_queue_declareok method will - be invoked by pika. - - :param str|unicode queue_name: The name of the queue to declare. - - """ - LOGGER.info('Declaring queue %s', queue_name) - self._channel.queue_declare(next_ops, - queue=queue_name, - exclusive=True) - - def on_direct_queue_declareok(self, method_frame): - """Method invoked by pika when the Queue.Declare RPC call made in - setup_queue has completed. In this method we will bind the queue - and exchange together with the routing key by issuing the Queue.Bind - RPC command. When this command is complete, the on_bindok method will - be invoked by pika. - - :param pika.frame.Method method_frame: The Queue.DeclareOk frame - - """ - queue_name = constant.queue_common + self.agent_id - LOGGER.info('Binding %s to %s with %s', - queue_name, constant.exchange_d, queue_name) - self._channel.queue_bind(self.on_bindok, queue_name, - constant.exchange_d, queue_name) - - def on_bindok(self, unused_frame): - """Invoked by pika when the Queue.Bind method has completed. At this - point we will start consuming messages by calling start_consuming - which will invoke the needed RPC commands to start the process. - - :param pika.frame.Method unused_frame: The Queue.BindOk response frame - - """ - LOGGER.info('Queue bound') - self.start_consuming() - - def start_consuming(self): - """This method sets up the consumer by first calling - add_on_cancel_callback so that the object is notified if RabbitMQ - cancels the consumer. It then issues the Basic.Consume RPC command - which returns the consumer tag that is used to uniquely identify the - consumer with RabbitMQ. We keep the value to use it when we want to - cancel consuming. The on_message method is passed in as a callback pika - will invoke when a message is fully received. - - """ - queue_name = constant.queue_common + self.agent_id - LOGGER.info('Issuing consumer related RPC commands') - self.add_on_cancel_callback() - self._consumer_tag = self._channel.basic_consume(self.on_message, - queue_name) - - def add_on_cancel_callback(self): - """Add a callback that will be invoked if RabbitMQ cancels the consumer - for some reason. If RabbitMQ does cancel the consumer, - on_consumer_cancelled will be invoked by pika. - - """ - LOGGER.info('Adding consumer cancellation callback') - self._channel.add_on_cancel_callback(self.on_consumer_cancelled) - - def on_consumer_cancelled(self, method_frame): - """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer - receiving messages. - - :param pika.frame.Method method_frame: The Basic.Cancel frame - - """ - LOGGER.info('Consumer was cancelled remotely, shutting down: %r', - method_frame) - if self._channel: - self._channel.close() - - def on_message(self, respone_chanl, basic_deliver, properties, body): - """Invoked by pika when a message is delivered from RabbitMQ. The - channel is passed for your convenience. The basic_deliver object that - is passed in carries the exchange, routing key, delivery tag and - a redelivered flag for the message. The properties passed in is an - instance of BasicProperties with the message properties and the body - is the message that was sent. - - :param pika.channel.Channel unused_channel: The channel object - :param pika.Spec.Basic.Deliver: basic_deliver method - :param pika.Spec.BasicProperties: properties - :param str|unicode body: The message body - - """ - LOGGER.info('Received message # %s from %s: %s', - basic_deliver.delivery_tag, properties.app_id, body) - try: - msg = message.decode(body) - head = message.get_context(msg) - main_body = message.get_body(msg) - - LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s", - {'h': head, - 'b': main_body}) - - func = getattr(self.agent_ops.driver, main_body.get('method')) - response = func(**main_body.get('args')) - except Exception as e: - LOGGER.error(message.dumpstrace()) - LOGGER.error("request happend error") - response = {'exception': {'name': e.__class__.__name__, - 'message': e.message, - 'args': e.args}} - finally: - response = message.add_context(response, **head) - LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", { - 'h': response.get('head'), 'b': response.get('body')}) - - respone_chanl.basic_publish( - exchange=constant.exchange_d, - routing_key=properties.reply_to, - properties=pika.BasicProperties( - correlation_id=properties.correlation_id), - body=message.encode(response)) - # no matter what happend, tell the mq-server to drop this msg. - - self.acknowledge_message(basic_deliver.delivery_tag) - - def acknowledge_message(self, delivery_tag): - """Acknowledge the message delivery from RabbitMQ by sending a - Basic.Ack RPC method for the delivery tag. - - :param int delivery_tag: The delivery tag from the Basic.Deliver frame - - """ - LOGGER.info('Acknowledging message %s', delivery_tag) - self._channel.basic_ack(delivery_tag) - - def stop_consuming(self): - """Tell RabbitMQ that you would like to stop consuming by sending the - Basic.Cancel RPC command. - - """ - if self._channel: - LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ') - self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) - - def on_cancelok(self, unused_frame): - """This method is invoked by pika when RabbitMQ acknowledges the - cancellation of a consumer. At this point we will close the channel. - This will invoke the on_channel_closed method once the channel has been - closed, which will in-turn close the connection. - - :param pika.frame.Method unused_frame: The Basic.CancelOk frame - - """ - LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer') - self.close_channel() - - def close_channel(self): - """Call to close the channel with RabbitMQ cleanly by issuing the - Channel.Close RPC command. - - """ - LOGGER.info('Closing the channel') - self._channel.close() - - def run(self): - """Run the example consumer by connecting to RabbitMQ and then - starting the IOLoop to block and allow the SelectConnection to operate. - - """ - try: - self._connection = self.connect() - except Exception as e: - LOGGER.error(message.dumpstrace()) - self._connection.ioloop.start() - - def stop(self): - """Cleanly shutdown the connection to RabbitMQ by stopping the consumer - with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok - will be invoked by pika, which will then closing the channel and - connection. The IOLoop is started again because this method is invoked - when CTRL-C is pressed raising a KeyboardInterrupt exception. This - exception stops the IOLoop which needs to be running for pika to - communicate with RabbitMQ. All of the commands issued prior to starting - the IOLoop will be buffered but not processed. - - """ - LOGGER.info('Stopping') - self._closing = True - self.stop_consuming() - self._connection.ioloop.stop() - self.close_connection() - LOGGER.info('Stopped') - - def close_connection(self): - """This method closes the connection to RabbitMQ.""" - LOGGER.info('Closing connection') - self._connection.close() diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py deleted file mode 100644 index cb72b45d..00000000 --- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py +++ /dev/null @@ -1,268 +0,0 @@ -############################################################################## -# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## - -import uuid -import json -import time -import exceptions -import logging - -import pika -from vstf.common import message -from vstf.common import excepts -from vstf.rpc_frame_work import constant - -LOG = logging.getLogger(__name__) - - -class RpcProxy(object): - - def __init__(self, host, - user='guest', - passwd='guest', - port='5672'): - """create a connection to rabbitmq,direct call and fan call supported. - - """ - # try to create connection of rabbitmq - self._channel = None - self._connection = None - self._queue = str(uuid.uuid4()) - self._consume_tag = None - - self.user = user - self.passwd = passwd - self.srv = host - self.port = port - self.url = 'amqp://' + self.user + ':' + self.passwd + \ - '@' + self.srv + ':' + self.port + '/%2F' - try: - self.connect(host, self.setup_vstf_producer) - except Exception as e: - LOG.error("create connection failed. e:%(e)s", {'e': e}) - raise e - - self.response = None - self.corr_id = None - - def connect(self, host, ok_callback): - """Create a Blocking connection to the rabbitmq-server - - :param str host: the rabbitmq-server's host - :param obj ok_callback: if connect success than do this function - - """ - LOG.info("Connect to the server %s", host) - self._connection = pika.BlockingConnection( - pika.URLParameters(self.url)) - if self._connection: - ok_callback() - - def setup_vstf_producer(self): - self.open_channel() - self.create_exchange(constant.exchange_d, constant.DIRECT) - self.bind_queues() - self.start_consumer() - - def open_channel(self): - self._channel = self._connection.channel() - if self._channel: - self._channel.confirm_delivery() - - def create_exchange(self, name, type): - LOG.info("Create %s exchange: %s", type, name) - self._channel.exchange_declare(exchange=name, type=type) - - def bind_queues(self): - LOG.info("Declare queue %s and bind it to exchange %s", - self._queue, constant.exchange_d) - self._channel.queue_declare(queue=self._queue, exclusive=True) - self._channel.queue_bind( - exchange=constant.exchange_d, - queue=self._queue) - - def start_consumer(self): - LOG.info("Start response consumer") - self._consume_tag = self._channel.basic_consume(self.on_response, - no_ack=True, - queue=self._queue) - - def stop_consuming(self): - """Tell RabbitMQ that you would like to stop consuming by sending the - Basic.Cancel RPC command. - - """ - if self._channel: - LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ') - self._channel.basic_cancel(self._consume_tag) - - self.close_channel() - - def close_channel(self): - """Call to close the channel with RabbitMQ cleanly by issuing the - Channel.Close RPC command. - - """ - LOG.info('Closing the channel') - self._channel.close() - - def close_connection(self): - """This method closes the connection to RabbitMQ.""" - LOG.info('Closing connection') - self._connection.close() - - def stop(self): - self.stop_consuming() - self.close_connection() - - def on_response(self, ch, method, props, body): - """this func reciver the msg""" - self.response = None - if self.corr_id == props.correlation_id: - self.response = json.loads(body) - LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", { - 'h': self.response.get('head'), 'b': self.response.get('body')}) - else: - LOG.warn("Proxy producer Drop the msg " - "because of the wrong correlation id, %s\n" % body) - - def publish(self, target, corrid, body): - properties = pika.BasicProperties(reply_to=self._queue, - correlation_id=corrid) - LOG.debug( - "start to publish message to the exchange=%s, target=%s, msg=%s", - constant.exchange_d, - target, - body) - return self._channel.basic_publish(exchange=constant.exchange_d, - routing_key=target, - mandatory=True, - properties=properties, - body=message.encode(body)) - - def call(self, msg, target='agent', timeout=constant.TIMEOUT): - """send msg to agent by id, this func will wait ack until timeout - :msg the msg to be sent - :id agent's id - :timeout timeout of waiting response - - """ - self.response = None - queue = constant.queue_common + target - # the msg request and respone must be match by corr_id - self.corr_id = str(uuid.uuid4()) - # same msg format - msg = message.add_context(msg, corrid=self.corr_id) - - # send msg to the queue - try: - ret = self.publish(queue, self.corr_id, msg) - except Exception as e: - LOG.error(message.dumpstrace()) - raise excepts.ChannelDie - - # if delivery msg failed. return error - # clean the msg in the queue - if not ret: - LOG.error("productor message delivery failed.") - return "Message can not be deliveryed, please check the connection of agent." - - # wait for response - t_begin = time.time() - while self.response is None: - self._connection.process_data_events() - count = time.time() - t_begin - if count > timeout: - LOG.error("Command timeout!") - # flush the msg of the queue - self._channel.queue_purge(queue=queue) - # self.channel.basic_cancel() - return False - - msg_body = message.get_body(message.decode(self.response)) - - # deal with exceptions - if msg_body \ - and isinstance(msg_body, dict) \ - and 'exception' in msg_body: - ename = str(msg_body['exception'].get('name')) - if hasattr(exceptions, ename): - e = getattr(exceptions, ename)() - else: - class CallError(Exception): - pass - - e = CallError() - e.message = str(msg_body['exception'].get('message')) - e.args = msg_body['exception'].get('args') - raise e - else: - return msg_body - - -class Server(object): - - def __init__(self, host=None, - user='guest', - passwd='guest', - port='5672'): - super(Server, self).__init__() - # Default use salt's master ip as rabbit rpc server ip - if host is None: - raise Exception( - "Can not create rpc proxy because of the None rabbitmq server address.") - - self.host = host - self.port = port - self.user = user - self.passwd = passwd - try: - self.proxy = RpcProxy(host=host, - port=port, - user=user, - passwd=passwd) - except Exception as e: - raise e - - def call(self, msg, msg_id, timeout=constant.TIMEOUT): - """when you add a server listen to the rabbit - you must appoint which queue to be listen. - :@queue the queue name. - """ - retry = False - - try: - ret = self.proxy.call(msg, target=msg_id, timeout=timeout) - except excepts.ChannelDie: - # this may be the proxy die, try to reconnect to the rabbit - del self.proxy - self.proxy = RpcProxy(host=self.host, - port=self.port, - user=self.user, - passwd=self.passwd) - if self.proxy is None: - raise excepts.UnsolvableExit - retry = True - - if retry: - # if retry happened except, throw to uplay - ret = self.proxy.call(msg, target=msg_id, timeout=timeout) - - return ret - - def cast(self, msg): - """when you want to send msg to all agent and no reply, use this func""" - LOG.warn("cast not support now.") - - def make_msg(self, method, **kargs): - return {'method': method, - 'args': kargs} - - def close(self): - self.proxy.stop() |