summaryrefslogtreecommitdiffstats
path: root/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work
diff options
context:
space:
mode:
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/rpc_frame_work')
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py8
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py22
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py435
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py268
4 files changed, 0 insertions, 733 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py
deleted file mode 100644
index 83b8d15d..00000000
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py
deleted file mode 100644
index 84d0592a..00000000
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/constant.py
+++ /dev/null
@@ -1,22 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-fan_exchange = "esp_exchange"
-exchange_d = "vstf_dexch"
-queue_common = "vstf-"
-TIMEOUT = 10
-NEVERTIMEOUT = 9999
-FAN = "fanout"
-DIRECT = "direct"
-
-# these for commandline
-
-NETDEV = "netdevs"
-LISTDEVS = "lspci | grep Eth"
-LIST_DEVS_NAME = "ls /sys/class/net/"
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
deleted file mode 100644
index 049b86fa..00000000
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_consumer.py
+++ /dev/null
@@ -1,435 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import logging
-import time
-
-import stevedore
-
-import pika
-from vstf.common import message
-from vstf.rpc_frame_work import constant
-
-LOGGER = logging.getLogger(__name__)
-
-
-class VstfConsumer(object):
- """This is an example consumer that will handle unexpected interactions
- with RabbitMQ such as channel and connection closures.
-
- If RabbitMQ closes the connection, it will reopen it. You should
- look at the output, as there are limited reasons why the connection may
- be closed, which usually are tied to permission related issues or
- socket timeouts.
-
- If the channel is closed, it will indicate a problem with one of the
- commands that were issued and that should surface in the output as well.
-
- """
-
- def __init__(self, agent,
- user='guest',
- passwd='guest',
- host='localhost',
- port='5672',
- agent_id="agent"):
- """Create a new instance of the consumer class, passing in the AMQP
- URL used to connect to RabbitMQ.
-
- :param str user: The user name of RabbitMQ server
- :param str passwd: The passwd of RabbitMQ server
- :param str host: The ip of RabbitMQ server
- :param str port: connection's port
-
- """
- self._connection = None
- self._channel = None
- self._closing = False
- self._consumer_tag = None
- self.user = user
- self.passwd = passwd
- self.srv = host
- self.port = port
- self.agent_id = agent_id
- self.url = 'amqp://' + self.user + ':' + self.passwd + \
- '@' + self.srv + ':' + self.port + '/%2F'
-
- # load the agent_funcs
- try:
- self.agent_ops = stevedore.driver.DriverManager(
- namespace="agent.plugins",
- name=agent,
- invoke_on_load=True)
- except Exception as e:
- LOGGER.error(message.dumpstrace())
- raise e
-
- super(VstfConsumer, self).__init__()
-
- def connect(self):
- """This method connects to RabbitMQ, returning the connection handle.
- When the connection is established, the on_connection_open method
- will be invoked by pika.
-
- :rtype: pika.SelectConnection
-
- """
- LOGGER.info('Connecting to %s:%s', self.srv, self.port)
- return pika.SelectConnection(pika.URLParameters(self.url),
- self.on_connection_open,
- stop_ioloop_on_close=False)
-
- # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
- # self.on_connection_open,
- # stop_ioloop_on_close=False)
-
- def on_connection_open(self, unused_connection):
- """This method is called by pika once the connection to RabbitMQ has
- been established. It passes the handle to the connection object in
- case we need it, but in this case, we'll just mark it unused.
-
- :type unused_connection: pika.SelectConnection
-
- """
- LOGGER.info('Connection opened')
- self.add_on_connection_close_callback()
- self.open_channel()
-
- def add_on_connection_close_callback(self):
- """This method adds an on close callback that will be invoked by pika
- when RabbitMQ closes the connection to the publisher unexpectedly.
-
- """
- LOGGER.info('Adding connection close callback')
- self._connection.add_on_close_callback(self.on_connection_closed)
-
- def on_connection_closed(self, connection, reply_code, reply_text):
- """This method is invoked by pika when the connection to RabbitMQ is
- closed unexpectedly. Since it is unexpected, we will reconnect to
- RabbitMQ if it disconnects.
-
- :param pika.connection.Connection connection: The closed connection obj
- :param int reply_code: The server provided reply_code if given
- :param str reply_text: The server provided reply_text if given
-
- """
- self._channel = None
- if self._closing:
- self._connection.ioloop.stop()
- else:
- LOGGER.warning(
- 'Connection closed, reopening in 2 seconds: (%s) %s',
- reply_code,
- reply_text)
- self._connection.add_timeout(2, self.reconnect)
-
- def reconnect(self):
- """Will be invoked by the IOLoop timer if the connection is
- closed. See the on_connection_closed method.
-
- """
- # This is the old connection IOLoop instance, stop its ioloop
- # Sometimes the broken connection may be exception
- try:
- self._connection.ioloop.stop()
- except Exception:
- pass
-
- while not self._closing:
- # Create a new connection
- try:
- self._connection = self.connect()
- except Exception:
- LOGGER.error(message.dumpstrace())
- time.sleep(3)
- continue
- break
-
- # There is now a new connection, needs a new ioloop to run
- self._connection.ioloop.start()
-
- def open_channel(self):
- """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
- command. When RabbitMQ responds that the channel is open, the
- on_channel_open callback will be invoked by pika.
-
- """
- LOGGER.info('Creating a new channel')
- self._connection.channel(on_open_callback=self.on_channel_open)
-
- def on_channel_open(self, channel):
- """This method is invoked by pika when the channel has been opened.
- The channel object is passed in so we can make use of it.
-
- Since the channel is now open, we'll declare the exchange to use.
-
- :param pika.channel.Channel channel: The channel object
-
- """
- LOGGER.info('Channel opened')
- self._channel = channel
- self.add_on_channel_close_callback()
- self.setup_exchanges()
-
- def add_on_channel_close_callback(self):
- """This method tells pika to call the on_channel_closed method if
- RabbitMQ unexpectedly closes the channel.
-
- """
- LOGGER.info('Adding channel close callback')
- self._channel.add_on_close_callback(self.on_channel_closed)
-
- def on_channel_closed(self, channel, reply_code, reply_text):
- """Invoked by pika when RabbitMQ unexpectedly closes the channel.
- Channels are usually closed if you attempt to do something that
- violates the protocol, such as re-declare an exchange or queue with
- different parameters. In this case, we'll close the connection
- to shutdown the object.
-
- :param pika.channel.Channel: The closed channel
- :param int reply_code: The numeric reason the channel was closed
- :param str reply_text: The text reason the channel was closed
-
- """
- LOGGER.warning('Channel %i was closed: (%s) %s',
- channel, reply_code, reply_text)
- self._connection.close()
-
- def setup_exchanges(self):
- """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
- command. When it is complete, the on_exchange_declareok method will
- be invoked by pika.
-
- :param str|unicode exchange_name: The name of the exchange to declare
-
- """
- LOGGER.info(
- 'Declaring %s exchange %s',
- constant.DIRECT,
- constant.exchange_d)
- self._channel.exchange_declare(self.on_direct_exchange_declareok,
- constant.exchange_d,
- constant.DIRECT)
-
- # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
- # self._channel.exchange_declare(self.on_fan_exchange_declareok,
- # constant.fan_exchange,
- # constant.FAN)
-
- def on_fan_exchange_declareok(self, unused_frame):
- """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
- command.
-
- :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
-
- """
- LOGGER.info('Exchange declared')
- pass
-
- def on_direct_exchange_declareok(self, unused_frame):
- queue_name = constant.queue_common + self.agent_id
- self.setup_queue(queue_name, self.on_direct_queue_declareok)
-
- def setup_queue(self, queue_name, next_ops):
- """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
- command. When it is complete, the on_queue_declareok method will
- be invoked by pika.
-
- :param str|unicode queue_name: The name of the queue to declare.
-
- """
- LOGGER.info('Declaring queue %s', queue_name)
- self._channel.queue_declare(next_ops,
- queue=queue_name,
- exclusive=True)
-
- def on_direct_queue_declareok(self, method_frame):
- """Method invoked by pika when the Queue.Declare RPC call made in
- setup_queue has completed. In this method we will bind the queue
- and exchange together with the routing key by issuing the Queue.Bind
- RPC command. When this command is complete, the on_bindok method will
- be invoked by pika.
-
- :param pika.frame.Method method_frame: The Queue.DeclareOk frame
-
- """
- queue_name = constant.queue_common + self.agent_id
- LOGGER.info('Binding %s to %s with %s',
- queue_name, constant.exchange_d, queue_name)
- self._channel.queue_bind(self.on_bindok, queue_name,
- constant.exchange_d, queue_name)
-
- def on_bindok(self, unused_frame):
- """Invoked by pika when the Queue.Bind method has completed. At this
- point we will start consuming messages by calling start_consuming
- which will invoke the needed RPC commands to start the process.
-
- :param pika.frame.Method unused_frame: The Queue.BindOk response frame
-
- """
- LOGGER.info('Queue bound')
- self.start_consuming()
-
- def start_consuming(self):
- """This method sets up the consumer by first calling
- add_on_cancel_callback so that the object is notified if RabbitMQ
- cancels the consumer. It then issues the Basic.Consume RPC command
- which returns the consumer tag that is used to uniquely identify the
- consumer with RabbitMQ. We keep the value to use it when we want to
- cancel consuming. The on_message method is passed in as a callback pika
- will invoke when a message is fully received.
-
- """
- queue_name = constant.queue_common + self.agent_id
- LOGGER.info('Issuing consumer related RPC commands')
- self.add_on_cancel_callback()
- self._consumer_tag = self._channel.basic_consume(self.on_message,
- queue_name)
-
- def add_on_cancel_callback(self):
- """Add a callback that will be invoked if RabbitMQ cancels the consumer
- for some reason. If RabbitMQ does cancel the consumer,
- on_consumer_cancelled will be invoked by pika.
-
- """
- LOGGER.info('Adding consumer cancellation callback')
- self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
-
- def on_consumer_cancelled(self, method_frame):
- """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
- receiving messages.
-
- :param pika.frame.Method method_frame: The Basic.Cancel frame
-
- """
- LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
- method_frame)
- if self._channel:
- self._channel.close()
-
- def on_message(self, respone_chanl, basic_deliver, properties, body):
- """Invoked by pika when a message is delivered from RabbitMQ. The
- channel is passed for your convenience. The basic_deliver object that
- is passed in carries the exchange, routing key, delivery tag and
- a redelivered flag for the message. The properties passed in is an
- instance of BasicProperties with the message properties and the body
- is the message that was sent.
-
- :param pika.channel.Channel unused_channel: The channel object
- :param pika.Spec.Basic.Deliver: basic_deliver method
- :param pika.Spec.BasicProperties: properties
- :param str|unicode body: The message body
-
- """
- LOGGER.info('Received message # %s from %s: %s',
- basic_deliver.delivery_tag, properties.app_id, body)
- try:
- msg = message.decode(body)
- head = message.get_context(msg)
- main_body = message.get_body(msg)
-
- LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
- {'h': head,
- 'b': main_body})
-
- func = getattr(self.agent_ops.driver, main_body.get('method'))
- response = func(**main_body.get('args'))
- except Exception as e:
- LOGGER.error(message.dumpstrace())
- LOGGER.error("request happend error")
- response = {'exception': {'name': e.__class__.__name__,
- 'message': e.message,
- 'args': e.args}}
- finally:
- response = message.add_context(response, **head)
- LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", {
- 'h': response.get('head'), 'b': response.get('body')})
-
- respone_chanl.basic_publish(
- exchange=constant.exchange_d,
- routing_key=properties.reply_to,
- properties=pika.BasicProperties(
- correlation_id=properties.correlation_id),
- body=message.encode(response))
- # no matter what happend, tell the mq-server to drop this msg.
-
- self.acknowledge_message(basic_deliver.delivery_tag)
-
- def acknowledge_message(self, delivery_tag):
- """Acknowledge the message delivery from RabbitMQ by sending a
- Basic.Ack RPC method for the delivery tag.
-
- :param int delivery_tag: The delivery tag from the Basic.Deliver frame
-
- """
- LOGGER.info('Acknowledging message %s', delivery_tag)
- self._channel.basic_ack(delivery_tag)
-
- def stop_consuming(self):
- """Tell RabbitMQ that you would like to stop consuming by sending the
- Basic.Cancel RPC command.
-
- """
- if self._channel:
- LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
- self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
-
- def on_cancelok(self, unused_frame):
- """This method is invoked by pika when RabbitMQ acknowledges the
- cancellation of a consumer. At this point we will close the channel.
- This will invoke the on_channel_closed method once the channel has been
- closed, which will in-turn close the connection.
-
- :param pika.frame.Method unused_frame: The Basic.CancelOk frame
-
- """
- LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
- self.close_channel()
-
- def close_channel(self):
- """Call to close the channel with RabbitMQ cleanly by issuing the
- Channel.Close RPC command.
-
- """
- LOGGER.info('Closing the channel')
- self._channel.close()
-
- def run(self):
- """Run the example consumer by connecting to RabbitMQ and then
- starting the IOLoop to block and allow the SelectConnection to operate.
-
- """
- try:
- self._connection = self.connect()
- except Exception as e:
- LOGGER.error(message.dumpstrace())
- self._connection.ioloop.start()
-
- def stop(self):
- """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
- with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
- will be invoked by pika, which will then closing the channel and
- connection. The IOLoop is started again because this method is invoked
- when CTRL-C is pressed raising a KeyboardInterrupt exception. This
- exception stops the IOLoop which needs to be running for pika to
- communicate with RabbitMQ. All of the commands issued prior to starting
- the IOLoop will be buffered but not processed.
-
- """
- LOGGER.info('Stopping')
- self._closing = True
- self.stop_consuming()
- self._connection.ioloop.stop()
- self.close_connection()
- LOGGER.info('Stopped')
-
- def close_connection(self):
- """This method closes the connection to RabbitMQ."""
- LOGGER.info('Closing connection')
- self._connection.close()
diff --git a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py b/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
deleted file mode 100644
index cb72b45d..00000000
--- a/testsuites/vstf/vstf_scripts/vstf/rpc_frame_work/rpc_producer.py
+++ /dev/null
@@ -1,268 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import uuid
-import json
-import time
-import exceptions
-import logging
-
-import pika
-from vstf.common import message
-from vstf.common import excepts
-from vstf.rpc_frame_work import constant
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcProxy(object):
-
- def __init__(self, host,
- user='guest',
- passwd='guest',
- port='5672'):
- """create a connection to rabbitmq,direct call and fan call supported.
-
- """
- # try to create connection of rabbitmq
- self._channel = None
- self._connection = None
- self._queue = str(uuid.uuid4())
- self._consume_tag = None
-
- self.user = user
- self.passwd = passwd
- self.srv = host
- self.port = port
- self.url = 'amqp://' + self.user + ':' + self.passwd + \
- '@' + self.srv + ':' + self.port + '/%2F'
- try:
- self.connect(host, self.setup_vstf_producer)
- except Exception as e:
- LOG.error("create connection failed. e:%(e)s", {'e': e})
- raise e
-
- self.response = None
- self.corr_id = None
-
- def connect(self, host, ok_callback):
- """Create a Blocking connection to the rabbitmq-server
-
- :param str host: the rabbitmq-server's host
- :param obj ok_callback: if connect success than do this function
-
- """
- LOG.info("Connect to the server %s", host)
- self._connection = pika.BlockingConnection(
- pika.URLParameters(self.url))
- if self._connection:
- ok_callback()
-
- def setup_vstf_producer(self):
- self.open_channel()
- self.create_exchange(constant.exchange_d, constant.DIRECT)
- self.bind_queues()
- self.start_consumer()
-
- def open_channel(self):
- self._channel = self._connection.channel()
- if self._channel:
- self._channel.confirm_delivery()
-
- def create_exchange(self, name, type):
- LOG.info("Create %s exchange: %s", type, name)
- self._channel.exchange_declare(exchange=name, type=type)
-
- def bind_queues(self):
- LOG.info("Declare queue %s and bind it to exchange %s",
- self._queue, constant.exchange_d)
- self._channel.queue_declare(queue=self._queue, exclusive=True)
- self._channel.queue_bind(
- exchange=constant.exchange_d,
- queue=self._queue)
-
- def start_consumer(self):
- LOG.info("Start response consumer")
- self._consume_tag = self._channel.basic_consume(self.on_response,
- no_ack=True,
- queue=self._queue)
-
- def stop_consuming(self):
- """Tell RabbitMQ that you would like to stop consuming by sending the
- Basic.Cancel RPC command.
-
- """
- if self._channel:
- LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
- self._channel.basic_cancel(self._consume_tag)
-
- self.close_channel()
-
- def close_channel(self):
- """Call to close the channel with RabbitMQ cleanly by issuing the
- Channel.Close RPC command.
-
- """
- LOG.info('Closing the channel')
- self._channel.close()
-
- def close_connection(self):
- """This method closes the connection to RabbitMQ."""
- LOG.info('Closing connection')
- self._connection.close()
-
- def stop(self):
- self.stop_consuming()
- self.close_connection()
-
- def on_response(self, ch, method, props, body):
- """this func reciver the msg"""
- self.response = None
- if self.corr_id == props.correlation_id:
- self.response = json.loads(body)
- LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", {
- 'h': self.response.get('head'), 'b': self.response.get('body')})
- else:
- LOG.warn("Proxy producer Drop the msg "
- "because of the wrong correlation id, %s\n" % body)
-
- def publish(self, target, corrid, body):
- properties = pika.BasicProperties(reply_to=self._queue,
- correlation_id=corrid)
- LOG.debug(
- "start to publish message to the exchange=%s, target=%s, msg=%s",
- constant.exchange_d,
- target,
- body)
- return self._channel.basic_publish(exchange=constant.exchange_d,
- routing_key=target,
- mandatory=True,
- properties=properties,
- body=message.encode(body))
-
- def call(self, msg, target='agent', timeout=constant.TIMEOUT):
- """send msg to agent by id, this func will wait ack until timeout
- :msg the msg to be sent
- :id agent's id
- :timeout timeout of waiting response
-
- """
- self.response = None
- queue = constant.queue_common + target
- # the msg request and respone must be match by corr_id
- self.corr_id = str(uuid.uuid4())
- # same msg format
- msg = message.add_context(msg, corrid=self.corr_id)
-
- # send msg to the queue
- try:
- ret = self.publish(queue, self.corr_id, msg)
- except Exception as e:
- LOG.error(message.dumpstrace())
- raise excepts.ChannelDie
-
- # if delivery msg failed. return error
- # clean the msg in the queue
- if not ret:
- LOG.error("productor message delivery failed.")
- return "Message can not be deliveryed, please check the connection of agent."
-
- # wait for response
- t_begin = time.time()
- while self.response is None:
- self._connection.process_data_events()
- count = time.time() - t_begin
- if count > timeout:
- LOG.error("Command timeout!")
- # flush the msg of the queue
- self._channel.queue_purge(queue=queue)
- # self.channel.basic_cancel()
- return False
-
- msg_body = message.get_body(message.decode(self.response))
-
- # deal with exceptions
- if msg_body \
- and isinstance(msg_body, dict) \
- and 'exception' in msg_body:
- ename = str(msg_body['exception'].get('name'))
- if hasattr(exceptions, ename):
- e = getattr(exceptions, ename)()
- else:
- class CallError(Exception):
- pass
-
- e = CallError()
- e.message = str(msg_body['exception'].get('message'))
- e.args = msg_body['exception'].get('args')
- raise e
- else:
- return msg_body
-
-
-class Server(object):
-
- def __init__(self, host=None,
- user='guest',
- passwd='guest',
- port='5672'):
- super(Server, self).__init__()
- # Default use salt's master ip as rabbit rpc server ip
- if host is None:
- raise Exception(
- "Can not create rpc proxy because of the None rabbitmq server address.")
-
- self.host = host
- self.port = port
- self.user = user
- self.passwd = passwd
- try:
- self.proxy = RpcProxy(host=host,
- port=port,
- user=user,
- passwd=passwd)
- except Exception as e:
- raise e
-
- def call(self, msg, msg_id, timeout=constant.TIMEOUT):
- """when you add a server listen to the rabbit
- you must appoint which queue to be listen.
- :@queue the queue name.
- """
- retry = False
-
- try:
- ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
- except excepts.ChannelDie:
- # this may be the proxy die, try to reconnect to the rabbit
- del self.proxy
- self.proxy = RpcProxy(host=self.host,
- port=self.port,
- user=self.user,
- passwd=self.passwd)
- if self.proxy is None:
- raise excepts.UnsolvableExit
- retry = True
-
- if retry:
- # if retry happened except, throw to uplay
- ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
-
- return ret
-
- def cast(self, msg):
- """when you want to send msg to all agent and no reply, use this func"""
- LOG.warn("cast not support now.")
-
- def make_msg(self, method, **kargs):
- return {'method': method,
- 'args': kargs}
-
- def close(self):
- self.proxy.stop()