diff options
Diffstat (limited to 'charms/trusty/kafka/hooks/callbacks.py')
-rw-r--r-- | charms/trusty/kafka/hooks/callbacks.py | 181 |
1 files changed, 0 insertions, 181 deletions
diff --git a/charms/trusty/kafka/hooks/callbacks.py b/charms/trusty/kafka/hooks/callbacks.py deleted file mode 100644 index bac68cb..0000000 --- a/charms/trusty/kafka/hooks/callbacks.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -from socket import gaierror, gethostbyname, gethostname -from subprocess import Popen, check_call - -import jujuresources -from charmhelpers.core import hookenv, templating -from charmhelpers.core import host -from charmhelpers.core import unitdata -from jujubigdata import utils -from jujubigdata.relations import Zookeeper - - -# Extended status support -# We call update_blocked_status from the "requires" section of our service -# block, so be sure to return True. Otherwise, we'll block the "requires" -# and never move on to callbacks. The other status update methods are called -# from the "callbacks" section and therefore don't need to return True. -def update_blocked_status(): - if unitdata.kv().get('charm.active', False): - return True - if not Zookeeper().connected_units(): - hookenv.status_set('blocked', 'Waiting for relation to apache-zookeeper') - elif not Zookeeper().is_ready(): - hookenv.status_set('waiting', 'Waiting for Zookeeper to become ready') - return True - - -def update_working_status(): - if unitdata.kv().get('charm.active', False): - hookenv.status_set('maintenance', 'Updating configuration') - return - hookenv.status_set('maintenance', 'Setting up Apache Kafka') - - -def update_active_status(): - unitdata.kv().set('charm.active', True) - hookenv.status_set('active', 'Ready') - - -def clear_active_flag(): - unitdata.kv().set('charm.active', False) - - -# Main Kafka class for callbacks -class Kafka(object): - def __init__(self, dist_config): - self.dist_config = dist_config - self.resources = { - 'kafka': 'kafka-%s' % host.cpu_arch(), - } - self.verify_resources = utils.verify_resources(*self.resources.values()) - - def fix_hostname(self): - # ensure hostname is resolvable - hostname = gethostname() - try: - gethostbyname(hostname) - except gaierror: - check_call(['sed', '-E', '-i', '-e', - '/127.0.0.1[[:blank:]]+/a \\\n127.0.1.1 ' + hostname, - '/etc/hosts']) - - def is_installed(self): - return unitdata.kv().get('kafka.installed') - - def install(self, force=False): - if not force and self.is_installed(): - return - self.fix_hostname() - self.dist_config.add_users() - self.dist_config.add_dirs() - self.dist_config.add_packages() - jujuresources.install(self.resources['kafka'], - destination=self.dist_config.path('kafka'), - skip_top_level=True) - self.setup_kafka_config() - unitdata.kv().set('kafka.installed', True) - - def setup_kafka_config(self): - ''' - copy the default configuration files to kafka_conf property - defined in dist.yaml - ''' - default_conf = self.dist_config.path('kafka') / 'config' - kafka_conf = self.dist_config.path('kafka_conf') - kafka_conf.rmtree_p() - default_conf.copytree(kafka_conf) - # Now remove the conf included in the tarball and symlink our real conf - # dir. we've seen issues where kafka still looks for config in - # KAFKA_HOME/config. - default_conf.rmtree_p() - kafka_conf.symlink(default_conf) - - # Configure immutable bits - kafka_bin = self.dist_config.path('kafka') / 'bin' - with utils.environment_edit_in_place('/etc/environment') as env: - if kafka_bin not in env['PATH']: - env['PATH'] = ':'.join([env['PATH'], kafka_bin]) - env['LOG_DIR'] = self.dist_config.path('kafka_app_logs') - - # note: we set the advertised.host.name below to the public_address - # to ensure that external (non-Juju) clients can connect to Kafka - public_address = hookenv.unit_get('public-address') - private_ip = utils.resolve_private_address(hookenv.unit_get('private-address')) - kafka_server_conf = self.dist_config.path('kafka_conf') / 'server.properties' - service, unit_num = os.environ['JUJU_UNIT_NAME'].split('/', 1) - utils.re_edit_in_place(kafka_server_conf, { - r'^broker.id=.*': 'broker.id=%s' % unit_num, - r'^port=.*': 'port=%s' % self.dist_config.port('kafka'), - r'^log.dirs=.*': 'log.dirs=%s' % self.dist_config.path('kafka_data_logs'), - r'^#?advertised.host.name=.*': 'advertised.host.name=%s' % public_address, - }) - - kafka_log4j = self.dist_config.path('kafka_conf') / 'log4j.properties' - utils.re_edit_in_place(kafka_log4j, { - r'^kafka.logs.dir=.*': 'kafka.logs.dir=%s' % self.dist_config.path('kafka_app_logs'), - }) - - # fix for lxc containers and some corner cases in manual provider - # ensure that public_address is resolvable internally by mapping it to the private IP - utils.update_etc_hosts({private_ip: public_address}) - - templating.render( - 'upstart.conf', - '/etc/init/kafka.conf', - context={ - 'kafka_conf': self.dist_config.path('kafka_conf'), - 'kafka_bin': '{}/bin'.format(self.dist_config.path('kafka')) - }, - ) - - def configure_kafka(self): - # Get ip:port data from our connected zookeepers - if Zookeeper().connected_units() and Zookeeper().is_ready(): - zks = [] - for unit, data in Zookeeper().filtered_data().items(): - ip = utils.resolve_private_address(data['private-address']) - zks.append("%s:%s" % (ip, data['port'])) - zks.sort() - zk_connect = ",".join(zks) - - # update consumer props - cfg = self.dist_config.path('kafka_conf') / 'consumer.properties' - utils.re_edit_in_place(cfg, { - r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect, - }) - - # update server props - cfg = self.dist_config.path('kafka_conf') / 'server.properties' - utils.re_edit_in_place(cfg, { - r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect, - }) - else: - # if we have no zookeepers, make sure kafka is stopped - self.stop() - - def run_bg(self, user, command, *args): - """ - Run a Kafka command as the `kafka` user in the background. - - :param str command: Command to run - :param list args: Additional args to pass to the command - """ - parts = [command] + list(args) - quoted = ' '.join("'%s'" % p for p in parts) - e = utils.read_etc_env() - Popen(['su', user, '-c', quoted], env=e) - - def restart(self): - self.stop() - self.start() - - def start(self): - host.service_start('kafka') - - def stop(self): - host.service_stop('kafka') - - def cleanup(self): - self.dist_config.remove_users() - self.dist_config.remove_dirs() |