summaryrefslogtreecommitdiffstats
path: root/vstf/vstf/rpc_frame_work
diff options
context:
space:
mode:
Diffstat (limited to 'vstf/vstf/rpc_frame_work')
-rwxr-xr-xvstf/vstf/rpc_frame_work/__init__.py15
-rwxr-xr-xvstf/vstf/rpc_frame_work/constant.py13
-rwxr-xr-xvstf/vstf/rpc_frame_work/rpc_consumer.py421
-rwxr-xr-xvstf/vstf/rpc_frame_work/rpc_producer.py251
4 files changed, 700 insertions, 0 deletions
diff --git a/vstf/vstf/rpc_frame_work/__init__.py b/vstf/vstf/rpc_frame_work/__init__.py
new file mode 100755
index 00000000..4dc8a6aa
--- /dev/null
+++ b/vstf/vstf/rpc_frame_work/__init__.py
@@ -0,0 +1,15 @@
+# Copyright Huawei Technologies Co., Ltd. 1998-2015.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the License); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
diff --git a/vstf/vstf/rpc_frame_work/constant.py b/vstf/vstf/rpc_frame_work/constant.py
new file mode 100755
index 00000000..da555dee
--- /dev/null
+++ b/vstf/vstf/rpc_frame_work/constant.py
@@ -0,0 +1,13 @@
+fan_exchange = "esp_exchange"
+exchange_d = "vstf_dexch"
+queue_common = "vstf-"
+TIMEOUT = 10
+NEVERTIMEOUT = 9999
+FAN = "fanout"
+DIRECT = "direct"
+
+# these for commandline
+
+NETDEV = "netdevs"
+LISTDEVS = "lspci | grep Eth"
+LIST_DEVS_NAME = "ls /sys/class/net/"
diff --git a/vstf/vstf/rpc_frame_work/rpc_consumer.py b/vstf/vstf/rpc_frame_work/rpc_consumer.py
new file mode 100755
index 00000000..f7aacfd6
--- /dev/null
+++ b/vstf/vstf/rpc_frame_work/rpc_consumer.py
@@ -0,0 +1,421 @@
+#!/usr/bin/env python
+# coding=utf-8
+import logging
+import time
+
+import stevedore
+
+import pika
+from vstf.common import message
+from vstf.rpc_frame_work import constant
+
+LOGGER = logging.getLogger(__name__)
+
+
+class VstfConsumer(object):
+ """This is an example consumer that will handle unexpected interactions
+ with RabbitMQ such as channel and connection closures.
+
+ If RabbitMQ closes the connection, it will reopen it. You should
+ look at the output, as there are limited reasons why the connection may
+ be closed, which usually are tied to permission related issues or
+ socket timeouts.
+
+ If the channel is closed, it will indicate a problem with one of the
+ commands that were issued and that should surface in the output as well.
+
+ """
+
+ def __init__(self, agent,
+ user='guest',
+ passwd='guest',
+ host='localhost',
+ port='5672',
+ agent_id="agent"):
+ """Create a new instance of the consumer class, passing in the AMQP
+ URL used to connect to RabbitMQ.
+
+ :param str user: The user name of RabbitMQ server
+ :param str passwd: The passwd of RabbitMQ server
+ :param str host: The ip of RabbitMQ server
+ :param str port: connection's port
+
+ """
+ self._connection = None
+ self._channel = None
+ self._closing = False
+ self._consumer_tag = None
+ self.user = user
+ self.passwd = passwd
+ self.srv = host
+ self.port = port
+ self.agent_id = agent_id
+ self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+
+ # load the agent_funcs
+ try:
+ self.agent_ops = stevedore.driver.DriverManager(
+ namespace="agent.plugins",
+ name=agent,
+ invoke_on_load=True)
+ except Exception as e:
+ LOGGER.error(message.dumpstrace())
+ raise e
+
+ super(VstfConsumer, self).__init__()
+
+ def connect(self):
+ """This method connects to RabbitMQ, returning the connection handle.
+ When the connection is established, the on_connection_open method
+ will be invoked by pika.
+
+ :rtype: pika.SelectConnection
+
+ """
+ LOGGER.info('Connecting to %s:%s', self.srv, self.port)
+ return pika.SelectConnection(pika.URLParameters(self.url),
+ self.on_connection_open,
+ stop_ioloop_on_close=False)
+
+ # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
+ # self.on_connection_open,
+ # stop_ioloop_on_close=False)
+
+ def on_connection_open(self, unused_connection):
+ """This method is called by pika once the connection to RabbitMQ has
+ been established. It passes the handle to the connection object in
+ case we need it, but in this case, we'll just mark it unused.
+
+ :type unused_connection: pika.SelectConnection
+
+ """
+ LOGGER.info('Connection opened')
+ self.add_on_connection_close_callback()
+ self.open_channel()
+
+ def add_on_connection_close_callback(self):
+ """This method adds an on close callback that will be invoked by pika
+ when RabbitMQ closes the connection to the publisher unexpectedly.
+
+ """
+ LOGGER.info('Adding connection close callback')
+ self._connection.add_on_close_callback(self.on_connection_closed)
+
+ def on_connection_closed(self, connection, reply_code, reply_text):
+ """This method is invoked by pika when the connection to RabbitMQ is
+ closed unexpectedly. Since it is unexpected, we will reconnect to
+ RabbitMQ if it disconnects.
+
+ :param pika.connection.Connection connection: The closed connection obj
+ :param int reply_code: The server provided reply_code if given
+ :param str reply_text: The server provided reply_text if given
+
+ """
+ self._channel = None
+ if self._closing:
+ self._connection.ioloop.stop()
+ else:
+ LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
+ reply_code, reply_text)
+ self._connection.add_timeout(2, self.reconnect)
+
+ def reconnect(self):
+ """Will be invoked by the IOLoop timer if the connection is
+ closed. See the on_connection_closed method.
+
+ """
+ # This is the old connection IOLoop instance, stop its ioloop
+ # Sometimes the broken connection may be exception
+ try:
+ self._connection.ioloop.stop()
+ except Exception:
+ pass
+
+ while not self._closing:
+ # Create a new connection
+ try:
+ self._connection = self.connect()
+ except Exception:
+ LOGGER.error(message.dumpstrace())
+ time.sleep(3)
+ continue
+ break
+
+ # There is now a new connection, needs a new ioloop to run
+ self._connection.ioloop.start()
+
+ def open_channel(self):
+ """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
+ command. When RabbitMQ responds that the channel is open, the
+ on_channel_open callback will be invoked by pika.
+
+ """
+ LOGGER.info('Creating a new channel')
+ self._connection.channel(on_open_callback=self.on_channel_open)
+
+ def on_channel_open(self, channel):
+ """This method is invoked by pika when the channel has been opened.
+ The channel object is passed in so we can make use of it.
+
+ Since the channel is now open, we'll declare the exchange to use.
+
+ :param pika.channel.Channel channel: The channel object
+
+ """
+ LOGGER.info('Channel opened')
+ self._channel = channel
+ self.add_on_channel_close_callback()
+ self.setup_exchanges()
+
+ def add_on_channel_close_callback(self):
+ """This method tells pika to call the on_channel_closed method if
+ RabbitMQ unexpectedly closes the channel.
+
+ """
+ LOGGER.info('Adding channel close callback')
+ self._channel.add_on_close_callback(self.on_channel_closed)
+
+ def on_channel_closed(self, channel, reply_code, reply_text):
+ """Invoked by pika when RabbitMQ unexpectedly closes the channel.
+ Channels are usually closed if you attempt to do something that
+ violates the protocol, such as re-declare an exchange or queue with
+ different parameters. In this case, we'll close the connection
+ to shutdown the object.
+
+ :param pika.channel.Channel: The closed channel
+ :param int reply_code: The numeric reason the channel was closed
+ :param str reply_text: The text reason the channel was closed
+
+ """
+ LOGGER.warning('Channel %i was closed: (%s) %s',
+ channel, reply_code, reply_text)
+ self._connection.close()
+
+ def setup_exchanges(self):
+ """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
+ command. When it is complete, the on_exchange_declareok method will
+ be invoked by pika.
+
+ :param str|unicode exchange_name: The name of the exchange to declare
+
+ """
+ LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
+ self._channel.exchange_declare(self.on_direct_exchange_declareok,
+ constant.exchange_d,
+ constant.DIRECT)
+
+ # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
+ # self._channel.exchange_declare(self.on_fan_exchange_declareok,
+ # constant.fan_exchange,
+ # constant.FAN)
+
+ def on_fan_exchange_declareok(self, unused_frame):
+ """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
+ command.
+
+ :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
+
+ """
+ LOGGER.info('Exchange declared')
+ pass
+
+ def on_direct_exchange_declareok(self, unused_frame):
+ queue_name = constant.queue_common + self.agent_id
+ self.setup_queue(queue_name, self.on_direct_queue_declareok)
+
+ def setup_queue(self, queue_name, next_ops):
+ """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
+ command. When it is complete, the on_queue_declareok method will
+ be invoked by pika.
+
+ :param str|unicode queue_name: The name of the queue to declare.
+
+ """
+ LOGGER.info('Declaring queue %s', queue_name)
+ self._channel.queue_declare(next_ops,
+ queue=queue_name,
+ exclusive=True)
+
+ def on_direct_queue_declareok(self, method_frame):
+ """Method invoked by pika when the Queue.Declare RPC call made in
+ setup_queue has completed. In this method we will bind the queue
+ and exchange together with the routing key by issuing the Queue.Bind
+ RPC command. When this command is complete, the on_bindok method will
+ be invoked by pika.
+
+ :param pika.frame.Method method_frame: The Queue.DeclareOk frame
+
+ """
+ queue_name = constant.queue_common + self.agent_id
+ LOGGER.info('Binding %s to %s with %s',
+ queue_name, constant.exchange_d, queue_name)
+ self._channel.queue_bind(self.on_bindok, queue_name,
+ constant.exchange_d, queue_name)
+
+ def on_bindok(self, unused_frame):
+ """Invoked by pika when the Queue.Bind method has completed. At this
+ point we will start consuming messages by calling start_consuming
+ which will invoke the needed RPC commands to start the process.
+
+ :param pika.frame.Method unused_frame: The Queue.BindOk response frame
+
+ """
+ LOGGER.info('Queue bound')
+ self.start_consuming()
+
+ def start_consuming(self):
+ """This method sets up the consumer by first calling
+ add_on_cancel_callback so that the object is notified if RabbitMQ
+ cancels the consumer. It then issues the Basic.Consume RPC command
+ which returns the consumer tag that is used to uniquely identify the
+ consumer with RabbitMQ. We keep the value to use it when we want to
+ cancel consuming. The on_message method is passed in as a callback pika
+ will invoke when a message is fully received.
+
+ """
+ queue_name = constant.queue_common + self.agent_id
+ LOGGER.info('Issuing consumer related RPC commands')
+ self.add_on_cancel_callback()
+ self._consumer_tag = self._channel.basic_consume(self.on_message,
+ queue_name)
+
+ def add_on_cancel_callback(self):
+ """Add a callback that will be invoked if RabbitMQ cancels the consumer
+ for some reason. If RabbitMQ does cancel the consumer,
+ on_consumer_cancelled will be invoked by pika.
+
+ """
+ LOGGER.info('Adding consumer cancellation callback')
+ self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+ def on_consumer_cancelled(self, method_frame):
+ """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
+ receiving messages.
+
+ :param pika.frame.Method method_frame: The Basic.Cancel frame
+
+ """
+ LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
+ method_frame)
+ if self._channel:
+ self._channel.close()
+
+ def on_message(self, respone_chanl, basic_deliver, properties, body):
+ """Invoked by pika when a message is delivered from RabbitMQ. The
+ channel is passed for your convenience. The basic_deliver object that
+ is passed in carries the exchange, routing key, delivery tag and
+ a redelivered flag for the message. The properties passed in is an
+ instance of BasicProperties with the message properties and the body
+ is the message that was sent.
+
+ :param pika.channel.Channel unused_channel: The channel object
+ :param pika.Spec.Basic.Deliver: basic_deliver method
+ :param pika.Spec.BasicProperties: properties
+ :param str|unicode body: The message body
+
+ """
+ LOGGER.info('Received message # %s from %s: %s',
+ basic_deliver.delivery_tag, properties.app_id, body)
+ try:
+ msg = message.decode(body)
+ head = message.get_context(msg)
+ main_body = message.get_body(msg)
+
+ LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
+ {'h': head,
+ 'b': main_body})
+
+ func = getattr(self.agent_ops.driver, main_body.get('method'))
+ response = func(**main_body.get('args'))
+ except Exception as e:
+ LOGGER.error(message.dumpstrace())
+ LOGGER.error("request happend error")
+ response = {'exception': {'name': e.__class__.__name__,
+ 'message': e.message,
+ 'args': e.args}}
+ finally:
+ response = message.add_context(response, **head)
+ LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
+ {'h': response.get('head'), 'b': response.get('body')})
+
+ respone_chanl.basic_publish(exchange=constant.exchange_d,
+ routing_key=properties.reply_to,
+ properties=pika.BasicProperties(correlation_id=properties.correlation_id),
+ body=message.encode(response)
+ )
+ # no matter what happend, tell the mq-server to drop this msg.
+
+ self.acknowledge_message(basic_deliver.delivery_tag)
+
+ def acknowledge_message(self, delivery_tag):
+ """Acknowledge the message delivery from RabbitMQ by sending a
+ Basic.Ack RPC method for the delivery tag.
+
+ :param int delivery_tag: The delivery tag from the Basic.Deliver frame
+
+ """
+ LOGGER.info('Acknowledging message %s', delivery_tag)
+ self._channel.basic_ack(delivery_tag)
+
+ def stop_consuming(self):
+ """Tell RabbitMQ that you would like to stop consuming by sending the
+ Basic.Cancel RPC command.
+
+ """
+ if self._channel:
+ LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
+ self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+ def on_cancelok(self, unused_frame):
+ """This method is invoked by pika when RabbitMQ acknowledges the
+ cancellation of a consumer. At this point we will close the channel.
+ This will invoke the on_channel_closed method once the channel has been
+ closed, which will in-turn close the connection.
+
+ :param pika.frame.Method unused_frame: The Basic.CancelOk frame
+
+ """
+ LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
+ self.close_channel()
+
+ def close_channel(self):
+ """Call to close the channel with RabbitMQ cleanly by issuing the
+ Channel.Close RPC command.
+
+ """
+ LOGGER.info('Closing the channel')
+ self._channel.close()
+
+ def run(self):
+ """Run the example consumer by connecting to RabbitMQ and then
+ starting the IOLoop to block and allow the SelectConnection to operate.
+
+ """
+ try:
+ self._connection = self.connect()
+ except Exception as e:
+ LOGGER.error(message.dumpstrace())
+ self._connection.ioloop.start()
+
+ def stop(self):
+ """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
+ with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
+ will be invoked by pika, which will then closing the channel and
+ connection. The IOLoop is started again because this method is invoked
+ when CTRL-C is pressed raising a KeyboardInterrupt exception. This
+ exception stops the IOLoop which needs to be running for pika to
+ communicate with RabbitMQ. All of the commands issued prior to starting
+ the IOLoop will be buffered but not processed.
+
+ """
+ LOGGER.info('Stopping')
+ self._closing = True
+ self.stop_consuming()
+ self._connection.ioloop.stop()
+ self.close_connection()
+ LOGGER.info('Stopped')
+
+ def close_connection(self):
+ """This method closes the connection to RabbitMQ."""
+ LOGGER.info('Closing connection')
+ self._connection.close()
diff --git a/vstf/vstf/rpc_frame_work/rpc_producer.py b/vstf/vstf/rpc_frame_work/rpc_producer.py
new file mode 100755
index 00000000..c56c9e5e
--- /dev/null
+++ b/vstf/vstf/rpc_frame_work/rpc_producer.py
@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+# coding=utf-8
+import uuid
+import json
+import time
+import exceptions
+import logging
+
+import pika
+from vstf.common import message
+from vstf.common import excepts
+from vstf.rpc_frame_work import constant
+
+LOG = logging.getLogger(__name__)
+
+
+class RpcProxy(object):
+ def __init__(self, host,
+ user='guest',
+ passwd='guest',
+ port='5672'):
+ """create a connection to rabbitmq,direct call and fan call supported.
+
+ """
+ # try to create connection of rabbitmq
+ self._channel = None
+ self._connection = None
+ self._queue = str(uuid.uuid4())
+ self._consume_tag = None
+
+ self.user = user
+ self.passwd = passwd
+ self.srv = host
+ self.port = port
+ self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+ try:
+ self.connect(host, self.setup_vstf_producer)
+ except Exception as e:
+ LOG.error("create connection failed. e:%(e)s", {'e': e})
+ raise e
+
+ self.response = None
+ self.corr_id = None
+
+ def connect(self, host, ok_callback):
+ """Create a Blocking connection to the rabbitmq-server
+
+ :param str host: the rabbitmq-server's host
+ :param obj ok_callback: if connect success than do this function
+
+ """
+ LOG.info("Connect to the server %s", host)
+ self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
+ if self._connection:
+ ok_callback()
+
+ def setup_vstf_producer(self):
+ self.open_channel()
+ self.create_exchange(constant.exchange_d, constant.DIRECT)
+ self.bind_queues()
+ self.start_consumer()
+
+ def open_channel(self):
+ self._channel = self._connection.channel()
+ if self._channel:
+ self._channel.confirm_delivery()
+
+ def create_exchange(self, name, type):
+ LOG.info("Create %s exchange: %s", type, name)
+ self._channel.exchange_declare(exchange=name, type=type)
+
+ def bind_queues(self):
+ LOG.info("Declare queue %s and bind it to exchange %s",
+ self._queue, constant.exchange_d)
+ self._channel.queue_declare(queue=self._queue, exclusive=True)
+ self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
+
+ def start_consumer(self):
+ LOG.info("Start response consumer")
+ self._consume_tag = self._channel.basic_consume(self.on_response,
+ no_ack=True,
+ queue=self._queue)
+
+ def stop_consuming(self):
+ """Tell RabbitMQ that you would like to stop consuming by sending the
+ Basic.Cancel RPC command.
+
+ """
+ if self._channel:
+ LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
+ self._channel.basic_cancel(self._consume_tag)
+
+ self.close_channel()
+
+ def close_channel(self):
+ """Call to close the channel with RabbitMQ cleanly by issuing the
+ Channel.Close RPC command.
+
+ """
+ LOG.info('Closing the channel')
+ self._channel.close()
+
+ def close_connection(self):
+ """This method closes the connection to RabbitMQ."""
+ LOG.info('Closing connection')
+ self._connection.close()
+
+ def stop(self):
+ self.stop_consuming()
+ self.close_connection()
+
+ def on_response(self, ch, method, props, body):
+ """this func reciver the msg"""
+ self.response = None
+ if self.corr_id == props.correlation_id:
+ self.response = json.loads(body)
+ LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
+ {'h': self.response.get('head'), 'b': self.response.get('body')})
+ else:
+ LOG.warn("Proxy producer Drop the msg "
+ "because of the wrong correlation id, %s\n" % body)
+
+ def publish(self, target, corrid, body):
+ properties = pika.BasicProperties(reply_to=self._queue,
+ correlation_id=corrid)
+ LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
+ , constant.exchange_d, target, body)
+ return self._channel.basic_publish(exchange=constant.exchange_d,
+ routing_key=target,
+ mandatory=True,
+ properties=properties,
+ body=message.encode(body))
+
+ def call(self, msg, target='agent', timeout=constant.TIMEOUT):
+ """send msg to agent by id, this func will wait ack until timeout
+ :msg the msg to be sent
+ :id agent's id
+ :timeout timeout of waiting response
+
+ """
+ self.response = None
+ queue = constant.queue_common + target
+ # the msg request and respone must be match by corr_id
+ self.corr_id = str(uuid.uuid4())
+ # same msg format
+ msg = message.add_context(msg, corrid=self.corr_id)
+
+ # send msg to the queue
+ try:
+ ret = self.publish(queue, self.corr_id, msg)
+ except Exception as e:
+ LOG.error(message.dumpstrace())
+ raise excepts.ChannelDie
+
+ # if delivery msg failed. return error
+ # clean the msg in the queue
+ if not ret:
+ LOG.error("productor message delivery failed.")
+ return "Message can not be deliveryed, please check the connection of agent."
+
+ # wait for response
+ t_begin = time.time()
+ while self.response is None:
+ self._connection.process_data_events()
+ count = time.time() - t_begin
+ if count > timeout:
+ LOG.error("Command timeout!")
+ # flush the msg of the queue
+ self._channel.queue_purge(queue=queue)
+ # self.channel.basic_cancel()
+ return False
+
+ msg_body = message.get_body(message.decode(self.response))
+
+ # deal with exceptions
+ if msg_body \
+ and isinstance(msg_body, dict) \
+ and msg_body.has_key('exception'):
+ ename = str(msg_body['exception'].get('name'))
+ if hasattr(exceptions, ename):
+ e = getattr(exceptions, ename)()
+ else:
+ class CallError(Exception):
+ pass
+
+ e = CallError()
+ e.message = str(msg_body['exception'].get('message'))
+ e.args = msg_body['exception'].get('args')
+ raise e
+ else:
+ return msg_body
+
+
+class Server(object):
+ def __init__(self, host=None,
+ user='guest',
+ passwd='guest',
+ port='5672'):
+ super(Server, self).__init__()
+ # Default use salt's master ip as rabbit rpc server ip
+ if host is None:
+ raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
+
+ self.host = host
+ self.port = port
+ self.user = user
+ self.passwd = passwd
+ try:
+ self.proxy = RpcProxy(host=host,
+ port=port,
+ user=user,
+ passwd=passwd)
+ except Exception as e:
+ raise e
+
+ def call(self, msg, msg_id, timeout=constant.TIMEOUT):
+ """when you add a server listen to the rabbit
+ you must appoint which queue to be listen.
+ :@queue the queue name.
+ """
+ retry = False
+
+ try:
+ ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
+ except excepts.ChannelDie:
+ # this may be the proxy die, try to reconnect to the rabbit
+ del self.proxy
+ self.proxy = RpcProxy(host=self.host,
+ port=self.port,
+ user=self.user,
+ passwd=self.passwd)
+ if self.proxy is None:
+ raise excepts.UnsolvableExit
+ retry = True
+
+ if retry:
+ # if retry happened except, throw to uplay
+ ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
+
+ return ret
+
+ def cast(self, msg):
+ """when you want to send msg to all agent and no reply, use this func"""
+ LOG.warn("cast not support now.")
+
+ def make_msg(self, method, **kargs):
+ return {'method': method,
+ 'args': kargs}
+
+ def close(self):
+ self.proxy.stop()