summaryrefslogtreecommitdiffstats
path: root/Testcases/cfgm_common/vnc_kombu.py
diff options
context:
space:
mode:
Diffstat (limited to 'Testcases/cfgm_common/vnc_kombu.py')
-rw-r--r--Testcases/cfgm_common/vnc_kombu.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/Testcases/cfgm_common/vnc_kombu.py b/Testcases/cfgm_common/vnc_kombu.py
new file mode 100644
index 0000000..0f00865
--- /dev/null
+++ b/Testcases/cfgm_common/vnc_kombu.py
@@ -0,0 +1,226 @@
+#
+# Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
+#
+import re
+import amqp.exceptions
+import kombu
+import gevent
+import gevent.monkey
+gevent.monkey.patch_all()
+import time
+from gevent.queue import Queue
+try:
+ from gevent.lock import Semaphore
+except ImportError:
+ # older versions of gevent
+ from gevent.coros import Semaphore
+
+from pysandesh.connection_info import ConnectionState
+from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \
+ ConnectionType
+from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
+
+__all__ = "VncKombuClient"
+
+
+class VncKombuClientBase(object):
+ def _update_sandesh_status(self, status, msg=''):
+ ConnectionState.update(conn_type=ConnectionType.DATABASE,
+ name='RabbitMQ', status=status, message=msg,
+ server_addrs=["%s:%s" % (self._rabbit_ip, self._rabbit_port)])
+ # end _update_sandesh_status
+
+ def publish(self, message):
+ self._publish_queue.put(message)
+ # end publish
+
+ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
+ rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
+ self._rabbit_ip = rabbit_ip
+ self._rabbit_port = rabbit_port
+ self._rabbit_user = rabbit_user
+ self._rabbit_password = rabbit_password
+ self._rabbit_vhost = rabbit_vhost
+ self._subscribe_cb = subscribe_cb
+ self._logger = logger
+ self._publish_queue = Queue()
+ self._conn_lock = Semaphore()
+
+ self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
+ durable=False)
+
+ def num_pending_messages(self):
+ return self._publish_queue.qsize()
+ # end num_pending_messages
+
+ def prepare_to_consume(self):
+ # override this method
+ return
+
+ def _reconnect(self, delete_old_q=False):
+ if self._conn_lock.locked():
+ # either connection-monitor or publisher should have taken
+ # the lock. The one who acquired the lock would re-establish
+ # the connection and releases the lock, so the other one can
+ # just wait on the lock, till it gets released
+ self._conn_lock.wait()
+ return
+
+ self._conn_lock.acquire()
+
+ msg = "RabbitMQ connection down"
+ self._logger(msg, level=SandeshLevel.SYS_ERR)
+ self._update_sandesh_status(ConnectionStatus.DOWN)
+ self._conn_state = ConnectionStatus.DOWN
+
+ self._conn.close()
+
+ self._conn.ensure_connection()
+ self._conn.connect()
+
+ self._update_sandesh_status(ConnectionStatus.UP)
+ self._conn_state = ConnectionStatus.UP
+ msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
+ self._logger(msg, level=SandeshLevel.SYS_NOTICE)
+
+ self._channel = self._conn.channel()
+ if delete_old_q:
+ # delete the old queue in first-connect context
+ # as db-resync would have caught up with history.
+ try:
+ bound_q = self._update_queue_obj(self._channel)
+ bound_q.delete()
+ except Exception as e:
+ msg = 'Unable to delete the old ampq queue: %s' %(str(e))
+ self._logger(msg, level=SandeshLevel.SYS_ERR)
+
+ self._consumer = kombu.Consumer(self._channel,
+ queues=self._update_queue_obj,
+ callbacks=[self._subscribe])
+ self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)
+
+ self._conn_lock.release()
+ # end _reconnect
+
+ def _connection_watch(self):
+ self.prepare_to_consume()
+ while True:
+ try:
+ self._consumer.consume()
+ self._conn.drain_events()
+ except self._conn.connection_errors + self._conn.channel_errors as e:
+ self._reconnect()
+ # end _connection_watch
+
+ def _publisher(self):
+ message = None
+ while True:
+ try:
+ if not message:
+ # earlier was sent fine, dequeue one more
+ message = self._publish_queue.get()
+
+ while True:
+ try:
+ self._producer.publish(message)
+ message = None
+ break
+ except self._conn.connection_errors + self._conn.channel_errors as e:
+ self._reconnect()
+ except Exception as e:
+ log_str = "Unknown exception in _publisher greenlet" + str(e)
+ self._logger(log_str, level=SandeshLevel.SYS_ERR)
+ # end _publisher
+
+ def _subscribe(self, body, message):
+ try:
+ self._subscribe_cb(body)
+ finally:
+ message.ack()
+
+
+ def _start(self):
+ self._reconnect(delete_old_q=True)
+
+ self._publisher_greenlet = gevent.spawn(self._publisher)
+ self._connection_monitor_greenlet = gevent.spawn(self._connection_watch)
+
+ def shutdown(self):
+ self._publisher_greenlet.kill()
+ self._connection_monitor_greenlet.kill()
+ self._producer.close()
+ self._consumer.close()
+ self._conn.close()
+
+
+class VncKombuClientV1(VncKombuClientBase):
+ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
+ rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
+ super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port,
+ rabbit_user, rabbit_password,
+ rabbit_vhost, rabbit_ha_mode,
+ q_name, subscribe_cb, logger)
+
+ self._conn = kombu.Connection(hostname=self._rabbit_ip,
+ port=self._rabbit_port,
+ userid=self._rabbit_user,
+ password=self._rabbit_password,
+ virtual_host=self._rabbit_vhost)
+ self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, durable=False)
+ self._start()
+ # end __init__
+
+
+class VncKombuClientV2(VncKombuClientBase):
+ def _parse_rabbit_hosts(self, rabbit_hosts):
+ server_list = rabbit_hosts.split(",")
+
+ default_dict = {'user': self._rabbit_user,
+ 'password': self._rabbit_password,
+ 'port': self._rabbit_port}
+ ret = []
+ for s in server_list:
+ match = re.match("(?:(?P<user>.*?)(?::(?P<password>.*?))*@)*(?P<host>.*?)(?::(?P<port>\d+))*$", s)
+ if match:
+ mdict = match.groupdict().copy()
+ for key in ['user', 'password', 'port']:
+ if not mdict[key]:
+ mdict[key] = default_dict[key]
+
+ ret.append(mdict)
+
+ return ret
+
+ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
+ rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
+ super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
+ rabbit_user, rabbit_password,
+ rabbit_vhost, rabbit_ha_mode,
+ q_name, subscribe_cb, logger)
+
+ _hosts = self._parse_rabbit_hosts(rabbit_hosts)
+ self._urls = []
+ for h in _hosts:
+ h['vhost'] = "" if not rabbit_vhost else rabbit_vhost
+ _url = "pyamqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s/" % h
+ self._urls.append(_url)
+
+ msg = "Initializing RabbitMQ connection, urls %s" % self._urls
+ self._logger(msg, level=SandeshLevel.SYS_NOTICE)
+ self._update_sandesh_status(ConnectionStatus.INIT)
+ self._conn_state = ConnectionStatus.INIT
+ self._conn = kombu.Connection(self._urls)
+ queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
+ self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
+ durable=False,
+ queue_arguments=queue_args)
+
+ self._start()
+ # end __init__
+
+
+from distutils.version import LooseVersion
+if LooseVersion(kombu.__version__) >= LooseVersion("2.5.0"):
+ VncKombuClient = VncKombuClientV2
+else:
+ VncKombuClient = VncKombuClientV1