summaryrefslogtreecommitdiffstats
path: root/Testcases/cfgm_common/zkclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'Testcases/cfgm_common/zkclient.py')
-rw-r--r--Testcases/cfgm_common/zkclient.py358
1 files changed, 0 insertions, 358 deletions
diff --git a/Testcases/cfgm_common/zkclient.py b/Testcases/cfgm_common/zkclient.py
deleted file mode 100644
index 5c8d461..0000000
--- a/Testcases/cfgm_common/zkclient.py
+++ /dev/null
@@ -1,358 +0,0 @@
-#
-# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
-#
-import os
-import gevent
-import logging
-import kazoo.client
-import kazoo.exceptions
-import kazoo.handlers.gevent
-import kazoo.recipe.election
-from kazoo.client import KazooState
-from kazoo.retry import KazooRetry
-
-from bitarray import bitarray
-from cfgm_common.exceptions import ResourceExhaustionError, ResourceExistsError
-from gevent.coros import BoundedSemaphore
-
-import uuid
-
-LOG_DIR = '/var/log/contrail/'
-
-class IndexAllocator(object):
-
- def __init__(self, zookeeper_client, path, size=0, start_idx=0,
- reverse=False,alloc_list=None, max_alloc=0):
- if alloc_list is None:
- self._alloc_list = [{'start':start_idx, 'end':start_idx+size}]
- else:
- sorted_alloc_list = sorted(alloc_list, key=lambda k: k['start'])
- self._alloc_list = sorted_alloc_list
-
- alloc_count = len(self._alloc_list)
- total_size = 0
- start_idx = self._alloc_list[0]['start']
- size = 0
-
- #check for overlap in alloc_list --TODO
- for alloc_idx in range (0, alloc_count -1):
- idx_start_addr = self._alloc_list[alloc_idx]['start']
- idx_end_addr = self._alloc_list[alloc_idx]['end']
- next_start_addr = self._alloc_list[alloc_idx+1]['start']
- if next_start_addr <= idx_end_addr:
- raise Exception()
- size += idx_end_addr - idx_start_addr + 1
- size += self._alloc_list[alloc_count-1]['end'] - self._alloc_list[alloc_count-1]['start'] + 1
-
- self._size = size
- self._start_idx = start_idx
- if max_alloc == 0:
- self._max_alloc = self._size
- else:
- self._max_alloc = max_alloc
-
- self._zookeeper_client = zookeeper_client
- self._path = path
- self._in_use = bitarray('0')
- self._reverse = reverse
- for idx in self._zookeeper_client.get_children(path):
- idx_int = self._get_bit_from_zk_index(int(idx))
- if idx_int >= 0:
- self._set_in_use(idx_int)
- # end for idx
- # end __init__
-
- def _get_zk_index_from_bit(self, idx):
- size = idx
- if self._reverse:
- for alloc in reversed(self._alloc_list):
- size -= alloc['end'] - alloc['start'] + 1
- if size < 0:
- return alloc['start']-size - 1
- else:
- for alloc in self._alloc_list:
- size -= alloc['end'] - alloc['start'] + 1
- if size < 0:
- return alloc['end']+size + 1
-
- raise Exception()
- # end _get_zk_index
-
- def _get_bit_from_zk_index(self, idx):
- size = 0
- if self._reverse:
- for alloc in reversed(self._alloc_list):
- if alloc['start'] <= idx <= alloc['end']:
- return alloc['end'] - idx + size
- size += alloc['end'] - alloc['start'] + 1
- pass
- else:
- for alloc in self._alloc_list:
- if alloc['start'] <= idx <= alloc['end']:
- return idx - alloc['start'] + size
- size += alloc['end'] - alloc['start'] + 1
- return -1
- # end _get_bit_from_zk_index
-
- def _set_in_use(self, idx):
- # if the index is higher than _max_alloc, do not use the bitarray, in
- # order to reduce the size of the bitarray. Otherwise, set the bit
- # corresponding to idx to 1 and extend the _in_use bitarray if needed
- if idx > self._max_alloc:
- return
- if idx >= self._in_use.length():
- temp = bitarray(idx - self._in_use.length())
- temp.setall(0)
- temp.append('1')
- self._in_use.extend(temp)
- else:
- self._in_use[idx] = 1
- # end _set_in_use
-
- def alloc(self, value=None):
- if self._in_use.all():
- idx = self._in_use.length()
- if idx > self._max_alloc:
- raise ResourceExhaustionError()
- self._in_use.append(1)
- else:
- idx = self._in_use.index(0)
- self._in_use[idx] = 1
-
- idx = self._get_zk_index_from_bit(idx)
- try:
- # Create a node at path and return its integer value
- id_str = "%(#)010d" % {'#': idx}
- self._zookeeper_client.create_node(self._path + id_str, value)
- return idx
- except ResourceExistsError:
- return self.alloc(value)
- # end alloc
-
- def reserve(self, idx, value=None):
- bit_idx = self._get_bit_from_zk_index(idx)
- if bit_idx < 0:
- return None
- try:
- # Create a node at path and return its integer value
- id_str = "%(#)010d" % {'#': idx}
- self._zookeeper_client.create_node(self._path + id_str, value)
- self._set_in_use(bit_idx)
- return idx
- except ResourceExistsError:
- self._set_in_use(bit_idx)
- return None
- # end reserve
-
- def delete(self, idx):
- id_str = "%(#)010d" % {'#': idx}
- self._zookeeper_client.delete_node(self._path + id_str)
- bit_idx = self._get_bit_from_zk_index(idx)
- if 0 <= bit_idx < self._in_use.length():
- self._in_use[bit_idx] = 0
- # end delete
-
- def read(self, idx):
- id_str = "%(#)010d" % {'#': idx}
- id_val = self._zookeeper_client.read_node(self._path+id_str)
- if id_val is not None:
- bit_idx = self._get_bit_from_zk_index(idx)
- if bit_idx >= 0:
- self._set_in_use(bit_idx)
- return id_val
- # end read
-
- def empty(self):
- return not self._in_use.any()
- # end empty
-
- @classmethod
- def delete_all(cls, zookeeper_client, path):
- try:
- zookeeper_client.delete_node(path, recursive=True)
- except kazoo.exceptions.NotEmptyError:
- #TODO: Add retries for NotEmptyError
- zookeeper_client.syslog("NotEmptyError while deleting %s" % path)
- # end delete_all
-
-#end class IndexAllocator
-
-
-class ZookeeperClient(object):
-
- def __init__(self, module, server_list, logging_fn=None):
- # logging
- logger = logging.getLogger(module)
- logger.setLevel(logging.INFO)
- try:
- handler = logging.handlers.RotatingFileHandler(LOG_DIR + module + '-zk.log', maxBytes=10*1024*1024, backupCount=5)
- except IOError:
- print "Cannot open log file in %s" %(LOG_DIR)
- else:
- log_format = logging.Formatter('%(asctime)s [%(name)s]: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
- handler.setFormatter(log_format)
- logger.addHandler(handler)
-
- if logging_fn:
- self.log = logging_fn
- else:
- self.log = self.syslog
-
- self._zk_client = \
- kazoo.client.KazooClient(
- server_list,
- timeout=400,
- handler=kazoo.handlers.gevent.SequentialGeventHandler(),
- logger=logger)
-
- self._zk_client.add_listener(self._zk_listener)
- self._logger = logger
- self._election = None
- self._server_list = server_list
- # KazooRetry to retry keeper CRUD operations
- self._retry = KazooRetry(max_tries=None, max_delay=300,
- sleep_func=gevent.sleep)
-
- self._conn_state = None
- self._sandesh_connection_info_update(status='INIT', message='')
- self._lost_cb = None
-
- self.connect()
- # end __init__
-
- # start
- def connect(self):
- while True:
- try:
- self._zk_client.start()
- break
- except gevent.event.Timeout as e:
- # Update connection info
- self._sandesh_connection_info_update(status='DOWN',
- message=str(e))
- gevent.sleep(1)
- # Zookeeper is also throwing exception due to delay in master election
- except Exception as e:
- # Update connection info
- self._sandesh_connection_info_update(status='DOWN',
- message=str(e))
- gevent.sleep(1)
- # Update connection info
- self._sandesh_connection_info_update(status='UP', message='')
-
- # end
-
- def is_connected(self):
- return self._zk_client.state == KazooState.CONNECTED
- # end is_connected
-
- def syslog(self, msg, *args, **kwargs):
- if not self._logger:
- return
- self._logger.info(msg)
- # end syslog
-
- def set_lost_cb(self, lost_cb=None):
- # set a callback to be called when kazoo state is lost
- # set to None for default action
- self._lost_cb = lost_cb
- # end set_lost_cb
-
- def _zk_listener(self, state):
- if state == KazooState.CONNECTED:
- if self._election:
- self._election.cancel()
- # Update connection info
- self._sandesh_connection_info_update(status='UP', message='')
- elif state == KazooState.LOST:
- # Lost the session with ZooKeeper Server
- # Best of option we have is to exit the process and restart all
- # over again
- if self._lost_cb:
- self._lost_cb()
- else:
- os._exit(2)
- elif state == KazooState.SUSPENDED:
- # Update connection info
- self._sandesh_connection_info_update(status='INIT',
- message = 'Connection to zookeeper lost. Retrying')
-
- # end
-
- def _zk_election_callback(self, func, *args, **kwargs):
- func(*args, **kwargs)
- # Exit if running master encounters error or exception
- exit(1)
- # end
-
- def master_election(self, path, identifier, func, *args, **kwargs):
- while True:
- self._election = self._zk_client.Election(path, identifier)
- self._election.run(self._zk_election_callback, func, *args, **kwargs)
- # end master_election
-
- def create_node(self, path, value=None):
- try:
- if value is None:
- value = uuid.uuid4()
- retry = self._retry.copy()
- retry(self._zk_client.create, path, str(value), makepath=True)
- except kazoo.exceptions.NodeExistsError:
- current_value = self.read_node(path)
- if current_value == value:
- return True;
- raise ResourceExistsError(path, str(current_value))
- # end create_node
-
- def delete_node(self, path, recursive=False):
- try:
- retry = self._retry.copy()
- retry(self._zk_client.delete, path, recursive=recursive)
- except kazoo.exceptions.NoNodeError:
- pass
- except Exception as e:
- raise e
- # end delete_node
-
- def read_node(self, path):
- try:
- retry = self._retry.copy()
- value = retry(self._zk_client.get, path)
- return value[0]
- except Exception:
- return None
- # end read_node
-
- def get_children(self, path):
- try:
- retry = self._retry.copy()
- return retry(self._zk_client.get_children, path)
- except Exception:
- return []
- # end read_node
-
- def _sandesh_connection_info_update(self, status, message):
- from pysandesh.connection_info import ConnectionState
- from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \
- ConnectionType
- from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
-
- new_conn_state = getattr(ConnectionStatus, status)
- ConnectionState.update(conn_type = ConnectionType.ZOOKEEPER,
- name = 'Zookeeper', status = new_conn_state,
- message = message,
- server_addrs = self._server_list.split(','))
-
- if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and
- new_conn_state == ConnectionStatus.DOWN):
- msg = 'Connection to Zookeeper down: %s' %(message)
- self.log(msg, level=SandeshLevel.SYS_ERR)
- if (self._conn_state and self._conn_state != new_conn_state and
- new_conn_state == ConnectionStatus.UP):
- msg = 'Connection to Zookeeper ESTABLISHED'
- self.log(msg, level=SandeshLevel.SYS_NOTICE)
-
- self._conn_state = new_conn_state
- # end _sandesh_connection_info_update
-
-# end class ZookeeperClient