diff options
Diffstat (limited to 'Testcases/cfgm_common/zkclient.py')
-rw-r--r-- | Testcases/cfgm_common/zkclient.py | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/Testcases/cfgm_common/zkclient.py b/Testcases/cfgm_common/zkclient.py new file mode 100644 index 0000000..5c8d461 --- /dev/null +++ b/Testcases/cfgm_common/zkclient.py @@ -0,0 +1,358 @@ +# +# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. +# +import os +import gevent +import logging +import kazoo.client +import kazoo.exceptions +import kazoo.handlers.gevent +import kazoo.recipe.election +from kazoo.client import KazooState +from kazoo.retry import KazooRetry + +from bitarray import bitarray +from cfgm_common.exceptions import ResourceExhaustionError, ResourceExistsError +from gevent.coros import BoundedSemaphore + +import uuid + +LOG_DIR = '/var/log/contrail/' + +class IndexAllocator(object): + + def __init__(self, zookeeper_client, path, size=0, start_idx=0, + reverse=False,alloc_list=None, max_alloc=0): + if alloc_list is None: + self._alloc_list = [{'start':start_idx, 'end':start_idx+size}] + else: + sorted_alloc_list = sorted(alloc_list, key=lambda k: k['start']) + self._alloc_list = sorted_alloc_list + + alloc_count = len(self._alloc_list) + total_size = 0 + start_idx = self._alloc_list[0]['start'] + size = 0 + + #check for overlap in alloc_list --TODO + for alloc_idx in range (0, alloc_count -1): + idx_start_addr = self._alloc_list[alloc_idx]['start'] + idx_end_addr = self._alloc_list[alloc_idx]['end'] + next_start_addr = self._alloc_list[alloc_idx+1]['start'] + if next_start_addr <= idx_end_addr: + raise Exception() + size += idx_end_addr - idx_start_addr + 1 + size += self._alloc_list[alloc_count-1]['end'] - self._alloc_list[alloc_count-1]['start'] + 1 + + self._size = size + self._start_idx = start_idx + if max_alloc == 0: + self._max_alloc = self._size + else: + self._max_alloc = max_alloc + + self._zookeeper_client = zookeeper_client + self._path = path + self._in_use = bitarray('0') + self._reverse = reverse + for idx in self._zookeeper_client.get_children(path): + idx_int = self._get_bit_from_zk_index(int(idx)) + if idx_int >= 0: + self._set_in_use(idx_int) + # end for idx + # end __init__ + + def _get_zk_index_from_bit(self, idx): + size = idx + if self._reverse: + for alloc in reversed(self._alloc_list): + size -= alloc['end'] - alloc['start'] + 1 + if size < 0: + return alloc['start']-size - 1 + else: + for alloc in self._alloc_list: + size -= alloc['end'] - alloc['start'] + 1 + if size < 0: + return alloc['end']+size + 1 + + raise Exception() + # end _get_zk_index + + def _get_bit_from_zk_index(self, idx): + size = 0 + if self._reverse: + for alloc in reversed(self._alloc_list): + if alloc['start'] <= idx <= alloc['end']: + return alloc['end'] - idx + size + size += alloc['end'] - alloc['start'] + 1 + pass + else: + for alloc in self._alloc_list: + if alloc['start'] <= idx <= alloc['end']: + return idx - alloc['start'] + size + size += alloc['end'] - alloc['start'] + 1 + return -1 + # end _get_bit_from_zk_index + + def _set_in_use(self, idx): + # if the index is higher than _max_alloc, do not use the bitarray, in + # order to reduce the size of the bitarray. Otherwise, set the bit + # corresponding to idx to 1 and extend the _in_use bitarray if needed + if idx > self._max_alloc: + return + if idx >= self._in_use.length(): + temp = bitarray(idx - self._in_use.length()) + temp.setall(0) + temp.append('1') + self._in_use.extend(temp) + else: + self._in_use[idx] = 1 + # end _set_in_use + + def alloc(self, value=None): + if self._in_use.all(): + idx = self._in_use.length() + if idx > self._max_alloc: + raise ResourceExhaustionError() + self._in_use.append(1) + else: + idx = self._in_use.index(0) + self._in_use[idx] = 1 + + idx = self._get_zk_index_from_bit(idx) + try: + # Create a node at path and return its integer value + id_str = "%(#)010d" % {'#': idx} + self._zookeeper_client.create_node(self._path + id_str, value) + return idx + except ResourceExistsError: + return self.alloc(value) + # end alloc + + def reserve(self, idx, value=None): + bit_idx = self._get_bit_from_zk_index(idx) + if bit_idx < 0: + return None + try: + # Create a node at path and return its integer value + id_str = "%(#)010d" % {'#': idx} + self._zookeeper_client.create_node(self._path + id_str, value) + self._set_in_use(bit_idx) + return idx + except ResourceExistsError: + self._set_in_use(bit_idx) + return None + # end reserve + + def delete(self, idx): + id_str = "%(#)010d" % {'#': idx} + self._zookeeper_client.delete_node(self._path + id_str) + bit_idx = self._get_bit_from_zk_index(idx) + if 0 <= bit_idx < self._in_use.length(): + self._in_use[bit_idx] = 0 + # end delete + + def read(self, idx): + id_str = "%(#)010d" % {'#': idx} + id_val = self._zookeeper_client.read_node(self._path+id_str) + if id_val is not None: + bit_idx = self._get_bit_from_zk_index(idx) + if bit_idx >= 0: + self._set_in_use(bit_idx) + return id_val + # end read + + def empty(self): + return not self._in_use.any() + # end empty + + @classmethod + def delete_all(cls, zookeeper_client, path): + try: + zookeeper_client.delete_node(path, recursive=True) + except kazoo.exceptions.NotEmptyError: + #TODO: Add retries for NotEmptyError + zookeeper_client.syslog("NotEmptyError while deleting %s" % path) + # end delete_all + +#end class IndexAllocator + + +class ZookeeperClient(object): + + def __init__(self, module, server_list, logging_fn=None): + # logging + logger = logging.getLogger(module) + logger.setLevel(logging.INFO) + try: + handler = logging.handlers.RotatingFileHandler(LOG_DIR + module + '-zk.log', maxBytes=10*1024*1024, backupCount=5) + except IOError: + print "Cannot open log file in %s" %(LOG_DIR) + else: + log_format = logging.Formatter('%(asctime)s [%(name)s]: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') + handler.setFormatter(log_format) + logger.addHandler(handler) + + if logging_fn: + self.log = logging_fn + else: + self.log = self.syslog + + self._zk_client = \ + kazoo.client.KazooClient( + server_list, + timeout=400, + handler=kazoo.handlers.gevent.SequentialGeventHandler(), + logger=logger) + + self._zk_client.add_listener(self._zk_listener) + self._logger = logger + self._election = None + self._server_list = server_list + # KazooRetry to retry keeper CRUD operations + self._retry = KazooRetry(max_tries=None, max_delay=300, + sleep_func=gevent.sleep) + + self._conn_state = None + self._sandesh_connection_info_update(status='INIT', message='') + self._lost_cb = None + + self.connect() + # end __init__ + + # start + def connect(self): + while True: + try: + self._zk_client.start() + break + except gevent.event.Timeout as e: + # Update connection info + self._sandesh_connection_info_update(status='DOWN', + message=str(e)) + gevent.sleep(1) + # Zookeeper is also throwing exception due to delay in master election + except Exception as e: + # Update connection info + self._sandesh_connection_info_update(status='DOWN', + message=str(e)) + gevent.sleep(1) + # Update connection info + self._sandesh_connection_info_update(status='UP', message='') + + # end + + def is_connected(self): + return self._zk_client.state == KazooState.CONNECTED + # end is_connected + + def syslog(self, msg, *args, **kwargs): + if not self._logger: + return + self._logger.info(msg) + # end syslog + + def set_lost_cb(self, lost_cb=None): + # set a callback to be called when kazoo state is lost + # set to None for default action + self._lost_cb = lost_cb + # end set_lost_cb + + def _zk_listener(self, state): + if state == KazooState.CONNECTED: + if self._election: + self._election.cancel() + # Update connection info + self._sandesh_connection_info_update(status='UP', message='') + elif state == KazooState.LOST: + # Lost the session with ZooKeeper Server + # Best of option we have is to exit the process and restart all + # over again + if self._lost_cb: + self._lost_cb() + else: + os._exit(2) + elif state == KazooState.SUSPENDED: + # Update connection info + self._sandesh_connection_info_update(status='INIT', + message = 'Connection to zookeeper lost. Retrying') + + # end + + def _zk_election_callback(self, func, *args, **kwargs): + func(*args, **kwargs) + # Exit if running master encounters error or exception + exit(1) + # end + + def master_election(self, path, identifier, func, *args, **kwargs): + while True: + self._election = self._zk_client.Election(path, identifier) + self._election.run(self._zk_election_callback, func, *args, **kwargs) + # end master_election + + def create_node(self, path, value=None): + try: + if value is None: + value = uuid.uuid4() + retry = self._retry.copy() + retry(self._zk_client.create, path, str(value), makepath=True) + except kazoo.exceptions.NodeExistsError: + current_value = self.read_node(path) + if current_value == value: + return True; + raise ResourceExistsError(path, str(current_value)) + # end create_node + + def delete_node(self, path, recursive=False): + try: + retry = self._retry.copy() + retry(self._zk_client.delete, path, recursive=recursive) + except kazoo.exceptions.NoNodeError: + pass + except Exception as e: + raise e + # end delete_node + + def read_node(self, path): + try: + retry = self._retry.copy() + value = retry(self._zk_client.get, path) + return value[0] + except Exception: + return None + # end read_node + + def get_children(self, path): + try: + retry = self._retry.copy() + return retry(self._zk_client.get_children, path) + except Exception: + return [] + # end read_node + + def _sandesh_connection_info_update(self, status, message): + from pysandesh.connection_info import ConnectionState + from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \ + ConnectionType + from pysandesh.gen_py.sandesh.ttypes import SandeshLevel + + new_conn_state = getattr(ConnectionStatus, status) + ConnectionState.update(conn_type = ConnectionType.ZOOKEEPER, + name = 'Zookeeper', status = new_conn_state, + message = message, + server_addrs = self._server_list.split(',')) + + if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and + new_conn_state == ConnectionStatus.DOWN): + msg = 'Connection to Zookeeper down: %s' %(message) + self.log(msg, level=SandeshLevel.SYS_ERR) + if (self._conn_state and self._conn_state != new_conn_state and + new_conn_state == ConnectionStatus.UP): + msg = 'Connection to Zookeeper ESTABLISHED' + self.log(msg, level=SandeshLevel.SYS_NOTICE) + + self._conn_state = new_conn_state + # end _sandesh_connection_info_update + +# end class ZookeeperClient |