diff options
Diffstat (limited to 'Testcases/cfgm_common/vnc_kombu.py')
-rw-r--r-- | Testcases/cfgm_common/vnc_kombu.py | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/Testcases/cfgm_common/vnc_kombu.py b/Testcases/cfgm_common/vnc_kombu.py new file mode 100644 index 0000000..0f00865 --- /dev/null +++ b/Testcases/cfgm_common/vnc_kombu.py @@ -0,0 +1,226 @@ +# +# Copyright (c) 2014 Juniper Networks, Inc. All rights reserved. +# +import re +import amqp.exceptions +import kombu +import gevent +import gevent.monkey +gevent.monkey.patch_all() +import time +from gevent.queue import Queue +try: + from gevent.lock import Semaphore +except ImportError: + # older versions of gevent + from gevent.coros import Semaphore + +from pysandesh.connection_info import ConnectionState +from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \ + ConnectionType +from pysandesh.gen_py.sandesh.ttypes import SandeshLevel + +__all__ = "VncKombuClient" + + +class VncKombuClientBase(object): + def _update_sandesh_status(self, status, msg=''): + ConnectionState.update(conn_type=ConnectionType.DATABASE, + name='RabbitMQ', status=status, message=msg, + server_addrs=["%s:%s" % (self._rabbit_ip, self._rabbit_port)]) + # end _update_sandesh_status + + def publish(self, message): + self._publish_queue.put(message) + # end publish + + def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + self._rabbit_ip = rabbit_ip + self._rabbit_port = rabbit_port + self._rabbit_user = rabbit_user + self._rabbit_password = rabbit_password + self._rabbit_vhost = rabbit_vhost + self._subscribe_cb = subscribe_cb + self._logger = logger + self._publish_queue = Queue() + self._conn_lock = Semaphore() + + self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout', + durable=False) + + def num_pending_messages(self): + return self._publish_queue.qsize() + # end num_pending_messages + + def prepare_to_consume(self): + # override this method + return + + def _reconnect(self, delete_old_q=False): + if self._conn_lock.locked(): + # either connection-monitor or publisher should have taken + # the lock. The one who acquired the lock would re-establish + # the connection and releases the lock, so the other one can + # just wait on the lock, till it gets released + self._conn_lock.wait() + return + + self._conn_lock.acquire() + + msg = "RabbitMQ connection down" + self._logger(msg, level=SandeshLevel.SYS_ERR) + self._update_sandesh_status(ConnectionStatus.DOWN) + self._conn_state = ConnectionStatus.DOWN + + self._conn.close() + + self._conn.ensure_connection() + self._conn.connect() + + self._update_sandesh_status(ConnectionStatus.UP) + self._conn_state = ConnectionStatus.UP + msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn) + self._logger(msg, level=SandeshLevel.SYS_NOTICE) + + self._channel = self._conn.channel() + if delete_old_q: + # delete the old queue in first-connect context + # as db-resync would have caught up with history. + try: + bound_q = self._update_queue_obj(self._channel) + bound_q.delete() + except Exception as e: + msg = 'Unable to delete the old ampq queue: %s' %(str(e)) + self._logger(msg, level=SandeshLevel.SYS_ERR) + + self._consumer = kombu.Consumer(self._channel, + queues=self._update_queue_obj, + callbacks=[self._subscribe]) + self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange) + + self._conn_lock.release() + # end _reconnect + + def _connection_watch(self): + self.prepare_to_consume() + while True: + try: + self._consumer.consume() + self._conn.drain_events() + except self._conn.connection_errors + self._conn.channel_errors as e: + self._reconnect() + # end _connection_watch + + def _publisher(self): + message = None + while True: + try: + if not message: + # earlier was sent fine, dequeue one more + message = self._publish_queue.get() + + while True: + try: + self._producer.publish(message) + message = None + break + except self._conn.connection_errors + self._conn.channel_errors as e: + self._reconnect() + except Exception as e: + log_str = "Unknown exception in _publisher greenlet" + str(e) + self._logger(log_str, level=SandeshLevel.SYS_ERR) + # end _publisher + + def _subscribe(self, body, message): + try: + self._subscribe_cb(body) + finally: + message.ack() + + + def _start(self): + self._reconnect(delete_old_q=True) + + self._publisher_greenlet = gevent.spawn(self._publisher) + self._connection_monitor_greenlet = gevent.spawn(self._connection_watch) + + def shutdown(self): + self._publisher_greenlet.kill() + self._connection_monitor_greenlet.kill() + self._producer.close() + self._consumer.close() + self._conn.close() + + +class VncKombuClientV1(VncKombuClientBase): + def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port, + rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, + q_name, subscribe_cb, logger) + + self._conn = kombu.Connection(hostname=self._rabbit_ip, + port=self._rabbit_port, + userid=self._rabbit_user, + password=self._rabbit_password, + virtual_host=self._rabbit_vhost) + self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, durable=False) + self._start() + # end __init__ + + +class VncKombuClientV2(VncKombuClientBase): + def _parse_rabbit_hosts(self, rabbit_hosts): + server_list = rabbit_hosts.split(",") + + default_dict = {'user': self._rabbit_user, + 'password': self._rabbit_password, + 'port': self._rabbit_port} + ret = [] + for s in server_list: + match = re.match("(?:(?P<user>.*?)(?::(?P<password>.*?))*@)*(?P<host>.*?)(?::(?P<port>\d+))*$", s) + if match: + mdict = match.groupdict().copy() + for key in ['user', 'password', 'port']: + if not mdict[key]: + mdict[key] = default_dict[key] + + ret.append(mdict) + + return ret + + def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port, + rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, + q_name, subscribe_cb, logger) + + _hosts = self._parse_rabbit_hosts(rabbit_hosts) + self._urls = [] + for h in _hosts: + h['vhost'] = "" if not rabbit_vhost else rabbit_vhost + _url = "pyamqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s/" % h + self._urls.append(_url) + + msg = "Initializing RabbitMQ connection, urls %s" % self._urls + self._logger(msg, level=SandeshLevel.SYS_NOTICE) + self._update_sandesh_status(ConnectionStatus.INIT) + self._conn_state = ConnectionStatus.INIT + self._conn = kombu.Connection(self._urls) + queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None + self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, + durable=False, + queue_arguments=queue_args) + + self._start() + # end __init__ + + +from distutils.version import LooseVersion +if LooseVersion(kombu.__version__) >= LooseVersion("2.5.0"): + VncKombuClient = VncKombuClientV2 +else: + VncKombuClient = VncKombuClientV1 |