aboutsummaryrefslogtreecommitdiffstats
path: root/Testcases/cfgm_common/vnc_kombu.py
diff options
context:
space:
mode:
Diffstat (limited to 'Testcases/cfgm_common/vnc_kombu.py')
-rw-r--r--Testcases/cfgm_common/vnc_kombu.py226
1 files changed, 0 insertions, 226 deletions
diff --git a/Testcases/cfgm_common/vnc_kombu.py b/Testcases/cfgm_common/vnc_kombu.py
deleted file mode 100644
index 0f00865..0000000
--- a/Testcases/cfgm_common/vnc_kombu.py
+++ /dev/null
@@ -1,226 +0,0 @@
-#
-# Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
-#
-import re
-import amqp.exceptions
-import kombu
-import gevent
-import gevent.monkey
-gevent.monkey.patch_all()
-import time
-from gevent.queue import Queue
-try:
- from gevent.lock import Semaphore
-except ImportError:
- # older versions of gevent
- from gevent.coros import Semaphore
-
-from pysandesh.connection_info import ConnectionState
-from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \
- ConnectionType
-from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
-
-__all__ = "VncKombuClient"
-
-
-class VncKombuClientBase(object):
- def _update_sandesh_status(self, status, msg=''):
- ConnectionState.update(conn_type=ConnectionType.DATABASE,
- name='RabbitMQ', status=status, message=msg,
- server_addrs=["%s:%s" % (self._rabbit_ip, self._rabbit_port)])
- # end _update_sandesh_status
-
- def publish(self, message):
- self._publish_queue.put(message)
- # end publish
-
- def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
- rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
- self._rabbit_ip = rabbit_ip
- self._rabbit_port = rabbit_port
- self._rabbit_user = rabbit_user
- self._rabbit_password = rabbit_password
- self._rabbit_vhost = rabbit_vhost
- self._subscribe_cb = subscribe_cb
- self._logger = logger
- self._publish_queue = Queue()
- self._conn_lock = Semaphore()
-
- self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
- durable=False)
-
- def num_pending_messages(self):
- return self._publish_queue.qsize()
- # end num_pending_messages
-
- def prepare_to_consume(self):
- # override this method
- return
-
- def _reconnect(self, delete_old_q=False):
- if self._conn_lock.locked():
- # either connection-monitor or publisher should have taken
- # the lock. The one who acquired the lock would re-establish
- # the connection and releases the lock, so the other one can
- # just wait on the lock, till it gets released
- self._conn_lock.wait()
- return
-
- self._conn_lock.acquire()
-
- msg = "RabbitMQ connection down"
- self._logger(msg, level=SandeshLevel.SYS_ERR)
- self._update_sandesh_status(ConnectionStatus.DOWN)
- self._conn_state = ConnectionStatus.DOWN
-
- self._conn.close()
-
- self._conn.ensure_connection()
- self._conn.connect()
-
- self._update_sandesh_status(ConnectionStatus.UP)
- self._conn_state = ConnectionStatus.UP
- msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
- self._logger(msg, level=SandeshLevel.SYS_NOTICE)
-
- self._channel = self._conn.channel()
- if delete_old_q:
- # delete the old queue in first-connect context
- # as db-resync would have caught up with history.
- try:
- bound_q = self._update_queue_obj(self._channel)
- bound_q.delete()
- except Exception as e:
- msg = 'Unable to delete the old ampq queue: %s' %(str(e))
- self._logger(msg, level=SandeshLevel.SYS_ERR)
-
- self._consumer = kombu.Consumer(self._channel,
- queues=self._update_queue_obj,
- callbacks=[self._subscribe])
- self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)
-
- self._conn_lock.release()
- # end _reconnect
-
- def _connection_watch(self):
- self.prepare_to_consume()
- while True:
- try:
- self._consumer.consume()
- self._conn.drain_events()
- except self._conn.connection_errors + self._conn.channel_errors as e:
- self._reconnect()
- # end _connection_watch
-
- def _publisher(self):
- message = None
- while True:
- try:
- if not message:
- # earlier was sent fine, dequeue one more
- message = self._publish_queue.get()
-
- while True:
- try:
- self._producer.publish(message)
- message = None
- break
- except self._conn.connection_errors + self._conn.channel_errors as e:
- self._reconnect()
- except Exception as e:
- log_str = "Unknown exception in _publisher greenlet" + str(e)
- self._logger(log_str, level=SandeshLevel.SYS_ERR)
- # end _publisher
-
- def _subscribe(self, body, message):
- try:
- self._subscribe_cb(body)
- finally:
- message.ack()
-
-
- def _start(self):
- self._reconnect(delete_old_q=True)
-
- self._publisher_greenlet = gevent.spawn(self._publisher)
- self._connection_monitor_greenlet = gevent.spawn(self._connection_watch)
-
- def shutdown(self):
- self._publisher_greenlet.kill()
- self._connection_monitor_greenlet.kill()
- self._producer.close()
- self._consumer.close()
- self._conn.close()
-
-
-class VncKombuClientV1(VncKombuClientBase):
- def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
- rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
- super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port,
- rabbit_user, rabbit_password,
- rabbit_vhost, rabbit_ha_mode,
- q_name, subscribe_cb, logger)
-
- self._conn = kombu.Connection(hostname=self._rabbit_ip,
- port=self._rabbit_port,
- userid=self._rabbit_user,
- password=self._rabbit_password,
- virtual_host=self._rabbit_vhost)
- self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, durable=False)
- self._start()
- # end __init__
-
-
-class VncKombuClientV2(VncKombuClientBase):
- def _parse_rabbit_hosts(self, rabbit_hosts):
- server_list = rabbit_hosts.split(",")
-
- default_dict = {'user': self._rabbit_user,
- 'password': self._rabbit_password,
- 'port': self._rabbit_port}
- ret = []
- for s in server_list:
- match = re.match("(?:(?P<user>.*?)(?::(?P<password>.*?))*@)*(?P<host>.*?)(?::(?P<port>\d+))*$", s)
- if match:
- mdict = match.groupdict().copy()
- for key in ['user', 'password', 'port']:
- if not mdict[key]:
- mdict[key] = default_dict[key]
-
- ret.append(mdict)
-
- return ret
-
- def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
- rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
- super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
- rabbit_user, rabbit_password,
- rabbit_vhost, rabbit_ha_mode,
- q_name, subscribe_cb, logger)
-
- _hosts = self._parse_rabbit_hosts(rabbit_hosts)
- self._urls = []
- for h in _hosts:
- h['vhost'] = "" if not rabbit_vhost else rabbit_vhost
- _url = "pyamqp://%(user)s:%(password)s@%(host)s:%(port)s/%(vhost)s/" % h
- self._urls.append(_url)
-
- msg = "Initializing RabbitMQ connection, urls %s" % self._urls
- self._logger(msg, level=SandeshLevel.SYS_NOTICE)
- self._update_sandesh_status(ConnectionStatus.INIT)
- self._conn_state = ConnectionStatus.INIT
- self._conn = kombu.Connection(self._urls)
- queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
- self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
- durable=False,
- queue_arguments=queue_args)
-
- self._start()
- # end __init__
-
-
-from distutils.version import LooseVersion
-if LooseVersion(kombu.__version__) >= LooseVersion("2.5.0"):
- VncKombuClient = VncKombuClientV2
-else:
- VncKombuClient = VncKombuClientV1