aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/kafka/hooks/callbacks.py
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/kafka/hooks/callbacks.py')
-rw-r--r--charms/trusty/kafka/hooks/callbacks.py181
1 files changed, 181 insertions, 0 deletions
diff --git a/charms/trusty/kafka/hooks/callbacks.py b/charms/trusty/kafka/hooks/callbacks.py
new file mode 100644
index 0000000..bac68cb
--- /dev/null
+++ b/charms/trusty/kafka/hooks/callbacks.py
@@ -0,0 +1,181 @@
+import os
+from socket import gaierror, gethostbyname, gethostname
+from subprocess import Popen, check_call
+
+import jujuresources
+from charmhelpers.core import hookenv, templating
+from charmhelpers.core import host
+from charmhelpers.core import unitdata
+from jujubigdata import utils
+from jujubigdata.relations import Zookeeper
+
+
+# Extended status support
+# We call update_blocked_status from the "requires" section of our service
+# block, so be sure to return True. Otherwise, we'll block the "requires"
+# and never move on to callbacks. The other status update methods are called
+# from the "callbacks" section and therefore don't need to return True.
+def update_blocked_status():
+ if unitdata.kv().get('charm.active', False):
+ return True
+ if not Zookeeper().connected_units():
+ hookenv.status_set('blocked', 'Waiting for relation to apache-zookeeper')
+ elif not Zookeeper().is_ready():
+ hookenv.status_set('waiting', 'Waiting for Zookeeper to become ready')
+ return True
+
+
+def update_working_status():
+ if unitdata.kv().get('charm.active', False):
+ hookenv.status_set('maintenance', 'Updating configuration')
+ return
+ hookenv.status_set('maintenance', 'Setting up Apache Kafka')
+
+
+def update_active_status():
+ unitdata.kv().set('charm.active', True)
+ hookenv.status_set('active', 'Ready')
+
+
+def clear_active_flag():
+ unitdata.kv().set('charm.active', False)
+
+
+# Main Kafka class for callbacks
+class Kafka(object):
+ def __init__(self, dist_config):
+ self.dist_config = dist_config
+ self.resources = {
+ 'kafka': 'kafka-%s' % host.cpu_arch(),
+ }
+ self.verify_resources = utils.verify_resources(*self.resources.values())
+
+ def fix_hostname(self):
+ # ensure hostname is resolvable
+ hostname = gethostname()
+ try:
+ gethostbyname(hostname)
+ except gaierror:
+ check_call(['sed', '-E', '-i', '-e',
+ '/127.0.0.1[[:blank:]]+/a \\\n127.0.1.1 ' + hostname,
+ '/etc/hosts'])
+
+ def is_installed(self):
+ return unitdata.kv().get('kafka.installed')
+
+ def install(self, force=False):
+ if not force and self.is_installed():
+ return
+ self.fix_hostname()
+ self.dist_config.add_users()
+ self.dist_config.add_dirs()
+ self.dist_config.add_packages()
+ jujuresources.install(self.resources['kafka'],
+ destination=self.dist_config.path('kafka'),
+ skip_top_level=True)
+ self.setup_kafka_config()
+ unitdata.kv().set('kafka.installed', True)
+
+ def setup_kafka_config(self):
+ '''
+ copy the default configuration files to kafka_conf property
+ defined in dist.yaml
+ '''
+ default_conf = self.dist_config.path('kafka') / 'config'
+ kafka_conf = self.dist_config.path('kafka_conf')
+ kafka_conf.rmtree_p()
+ default_conf.copytree(kafka_conf)
+ # Now remove the conf included in the tarball and symlink our real conf
+ # dir. we've seen issues where kafka still looks for config in
+ # KAFKA_HOME/config.
+ default_conf.rmtree_p()
+ kafka_conf.symlink(default_conf)
+
+ # Configure immutable bits
+ kafka_bin = self.dist_config.path('kafka') / 'bin'
+ with utils.environment_edit_in_place('/etc/environment') as env:
+ if kafka_bin not in env['PATH']:
+ env['PATH'] = ':'.join([env['PATH'], kafka_bin])
+ env['LOG_DIR'] = self.dist_config.path('kafka_app_logs')
+
+ # note: we set the advertised.host.name below to the public_address
+ # to ensure that external (non-Juju) clients can connect to Kafka
+ public_address = hookenv.unit_get('public-address')
+ private_ip = utils.resolve_private_address(hookenv.unit_get('private-address'))
+ kafka_server_conf = self.dist_config.path('kafka_conf') / 'server.properties'
+ service, unit_num = os.environ['JUJU_UNIT_NAME'].split('/', 1)
+ utils.re_edit_in_place(kafka_server_conf, {
+ r'^broker.id=.*': 'broker.id=%s' % unit_num,
+ r'^port=.*': 'port=%s' % self.dist_config.port('kafka'),
+ r'^log.dirs=.*': 'log.dirs=%s' % self.dist_config.path('kafka_data_logs'),
+ r'^#?advertised.host.name=.*': 'advertised.host.name=%s' % public_address,
+ })
+
+ kafka_log4j = self.dist_config.path('kafka_conf') / 'log4j.properties'
+ utils.re_edit_in_place(kafka_log4j, {
+ r'^kafka.logs.dir=.*': 'kafka.logs.dir=%s' % self.dist_config.path('kafka_app_logs'),
+ })
+
+ # fix for lxc containers and some corner cases in manual provider
+ # ensure that public_address is resolvable internally by mapping it to the private IP
+ utils.update_etc_hosts({private_ip: public_address})
+
+ templating.render(
+ 'upstart.conf',
+ '/etc/init/kafka.conf',
+ context={
+ 'kafka_conf': self.dist_config.path('kafka_conf'),
+ 'kafka_bin': '{}/bin'.format(self.dist_config.path('kafka'))
+ },
+ )
+
+ def configure_kafka(self):
+ # Get ip:port data from our connected zookeepers
+ if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zk_connect = ",".join(zks)
+
+ # update consumer props
+ cfg = self.dist_config.path('kafka_conf') / 'consumer.properties'
+ utils.re_edit_in_place(cfg, {
+ r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
+ })
+
+ # update server props
+ cfg = self.dist_config.path('kafka_conf') / 'server.properties'
+ utils.re_edit_in_place(cfg, {
+ r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
+ })
+ else:
+ # if we have no zookeepers, make sure kafka is stopped
+ self.stop()
+
+ def run_bg(self, user, command, *args):
+ """
+ Run a Kafka command as the `kafka` user in the background.
+
+ :param str command: Command to run
+ :param list args: Additional args to pass to the command
+ """
+ parts = [command] + list(args)
+ quoted = ' '.join("'%s'" % p for p in parts)
+ e = utils.read_etc_env()
+ Popen(['su', user, '-c', quoted], env=e)
+
+ def restart(self):
+ self.stop()
+ self.start()
+
+ def start(self):
+ host.service_start('kafka')
+
+ def stop(self):
+ host.service_stop('kafka')
+
+ def cleanup(self):
+ self.dist_config.remove_users()
+ self.dist_config.remove_dirs()