aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/kafka/hooks
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/kafka/hooks')
-rw-r--r--charms/trusty/kafka/hooks/callbacks.py181
-rwxr-xr-xcharms/trusty/kafka/hooks/common.py90
-rwxr-xr-xcharms/trusty/kafka/hooks/config-changed15
-rwxr-xr-xcharms/trusty/kafka/hooks/install17
-rwxr-xr-xcharms/trusty/kafka/hooks/kafka-relation-changed15
-rw-r--r--charms/trusty/kafka/hooks/setup.py33
-rwxr-xr-xcharms/trusty/kafka/hooks/start15
-rwxr-xr-xcharms/trusty/kafka/hooks/stop15
-rwxr-xr-xcharms/trusty/kafka/hooks/zookeeper-relation-changed15
-rwxr-xr-xcharms/trusty/kafka/hooks/zookeeper-relation-departed15
10 files changed, 411 insertions, 0 deletions
diff --git a/charms/trusty/kafka/hooks/callbacks.py b/charms/trusty/kafka/hooks/callbacks.py
new file mode 100644
index 0000000..bac68cb
--- /dev/null
+++ b/charms/trusty/kafka/hooks/callbacks.py
@@ -0,0 +1,181 @@
+import os
+from socket import gaierror, gethostbyname, gethostname
+from subprocess import Popen, check_call
+
+import jujuresources
+from charmhelpers.core import hookenv, templating
+from charmhelpers.core import host
+from charmhelpers.core import unitdata
+from jujubigdata import utils
+from jujubigdata.relations import Zookeeper
+
+
+# Extended status support
+# We call update_blocked_status from the "requires" section of our service
+# block, so be sure to return True. Otherwise, we'll block the "requires"
+# and never move on to callbacks. The other status update methods are called
+# from the "callbacks" section and therefore don't need to return True.
+def update_blocked_status():
+ if unitdata.kv().get('charm.active', False):
+ return True
+ if not Zookeeper().connected_units():
+ hookenv.status_set('blocked', 'Waiting for relation to apache-zookeeper')
+ elif not Zookeeper().is_ready():
+ hookenv.status_set('waiting', 'Waiting for Zookeeper to become ready')
+ return True
+
+
+def update_working_status():
+ if unitdata.kv().get('charm.active', False):
+ hookenv.status_set('maintenance', 'Updating configuration')
+ return
+ hookenv.status_set('maintenance', 'Setting up Apache Kafka')
+
+
+def update_active_status():
+ unitdata.kv().set('charm.active', True)
+ hookenv.status_set('active', 'Ready')
+
+
+def clear_active_flag():
+ unitdata.kv().set('charm.active', False)
+
+
+# Main Kafka class for callbacks
+class Kafka(object):
+ def __init__(self, dist_config):
+ self.dist_config = dist_config
+ self.resources = {
+ 'kafka': 'kafka-%s' % host.cpu_arch(),
+ }
+ self.verify_resources = utils.verify_resources(*self.resources.values())
+
+ def fix_hostname(self):
+ # ensure hostname is resolvable
+ hostname = gethostname()
+ try:
+ gethostbyname(hostname)
+ except gaierror:
+ check_call(['sed', '-E', '-i', '-e',
+ '/127.0.0.1[[:blank:]]+/a \\\n127.0.1.1 ' + hostname,
+ '/etc/hosts'])
+
+ def is_installed(self):
+ return unitdata.kv().get('kafka.installed')
+
+ def install(self, force=False):
+ if not force and self.is_installed():
+ return
+ self.fix_hostname()
+ self.dist_config.add_users()
+ self.dist_config.add_dirs()
+ self.dist_config.add_packages()
+ jujuresources.install(self.resources['kafka'],
+ destination=self.dist_config.path('kafka'),
+ skip_top_level=True)
+ self.setup_kafka_config()
+ unitdata.kv().set('kafka.installed', True)
+
+ def setup_kafka_config(self):
+ '''
+ copy the default configuration files to kafka_conf property
+ defined in dist.yaml
+ '''
+ default_conf = self.dist_config.path('kafka') / 'config'
+ kafka_conf = self.dist_config.path('kafka_conf')
+ kafka_conf.rmtree_p()
+ default_conf.copytree(kafka_conf)
+ # Now remove the conf included in the tarball and symlink our real conf
+ # dir. we've seen issues where kafka still looks for config in
+ # KAFKA_HOME/config.
+ default_conf.rmtree_p()
+ kafka_conf.symlink(default_conf)
+
+ # Configure immutable bits
+ kafka_bin = self.dist_config.path('kafka') / 'bin'
+ with utils.environment_edit_in_place('/etc/environment') as env:
+ if kafka_bin not in env['PATH']:
+ env['PATH'] = ':'.join([env['PATH'], kafka_bin])
+ env['LOG_DIR'] = self.dist_config.path('kafka_app_logs')
+
+ # note: we set the advertised.host.name below to the public_address
+ # to ensure that external (non-Juju) clients can connect to Kafka
+ public_address = hookenv.unit_get('public-address')
+ private_ip = utils.resolve_private_address(hookenv.unit_get('private-address'))
+ kafka_server_conf = self.dist_config.path('kafka_conf') / 'server.properties'
+ service, unit_num = os.environ['JUJU_UNIT_NAME'].split('/', 1)
+ utils.re_edit_in_place(kafka_server_conf, {
+ r'^broker.id=.*': 'broker.id=%s' % unit_num,
+ r'^port=.*': 'port=%s' % self.dist_config.port('kafka'),
+ r'^log.dirs=.*': 'log.dirs=%s' % self.dist_config.path('kafka_data_logs'),
+ r'^#?advertised.host.name=.*': 'advertised.host.name=%s' % public_address,
+ })
+
+ kafka_log4j = self.dist_config.path('kafka_conf') / 'log4j.properties'
+ utils.re_edit_in_place(kafka_log4j, {
+ r'^kafka.logs.dir=.*': 'kafka.logs.dir=%s' % self.dist_config.path('kafka_app_logs'),
+ })
+
+ # fix for lxc containers and some corner cases in manual provider
+ # ensure that public_address is resolvable internally by mapping it to the private IP
+ utils.update_etc_hosts({private_ip: public_address})
+
+ templating.render(
+ 'upstart.conf',
+ '/etc/init/kafka.conf',
+ context={
+ 'kafka_conf': self.dist_config.path('kafka_conf'),
+ 'kafka_bin': '{}/bin'.format(self.dist_config.path('kafka'))
+ },
+ )
+
+ def configure_kafka(self):
+ # Get ip:port data from our connected zookeepers
+ if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zk_connect = ",".join(zks)
+
+ # update consumer props
+ cfg = self.dist_config.path('kafka_conf') / 'consumer.properties'
+ utils.re_edit_in_place(cfg, {
+ r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
+ })
+
+ # update server props
+ cfg = self.dist_config.path('kafka_conf') / 'server.properties'
+ utils.re_edit_in_place(cfg, {
+ r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
+ })
+ else:
+ # if we have no zookeepers, make sure kafka is stopped
+ self.stop()
+
+ def run_bg(self, user, command, *args):
+ """
+ Run a Kafka command as the `kafka` user in the background.
+
+ :param str command: Command to run
+ :param list args: Additional args to pass to the command
+ """
+ parts = [command] + list(args)
+ quoted = ' '.join("'%s'" % p for p in parts)
+ e = utils.read_etc_env()
+ Popen(['su', user, '-c', quoted], env=e)
+
+ def restart(self):
+ self.stop()
+ self.start()
+
+ def start(self):
+ host.service_start('kafka')
+
+ def stop(self):
+ host.service_stop('kafka')
+
+ def cleanup(self):
+ self.dist_config.remove_users()
+ self.dist_config.remove_dirs()
diff --git a/charms/trusty/kafka/hooks/common.py b/charms/trusty/kafka/hooks/common.py
new file mode 100755
index 0000000..1ca080b
--- /dev/null
+++ b/charms/trusty/kafka/hooks/common.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Common implementation for all hooks.
+"""
+
+import jujuresources
+from charmhelpers.core import hookenv
+from charmhelpers.core import unitdata
+from charmhelpers.core import charmframework
+
+
+def bootstrap_resources():
+ """
+ Install required resources defined in resources.yaml
+ """
+ if unitdata.kv().get('charm.bootstrapped', False):
+ return True
+ hookenv.status_set('maintenance', 'Installing base resources')
+ mirror_url = jujuresources.config_get('resources_mirror')
+ if not jujuresources.fetch(mirror_url=mirror_url):
+ missing = jujuresources.invalid()
+ hookenv.status_set('blocked', 'Unable to fetch required resource%s: %s' % (
+ 's' if len(missing) > 1 else '',
+ ', '.join(missing),
+ ))
+ return False
+ jujuresources.install(['pathlib', 'jujubigdata'])
+ unitdata.kv().set('charm.bootstrapped', True)
+ return True
+
+
+def manage():
+ if not bootstrap_resources():
+ # defer until resources are available, since charmhelpers, and thus
+ # the framework, are required (will require manual intervention)
+ return
+
+ import jujubigdata
+ import callbacks
+
+ kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports']
+ dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml',
+ required_keys=kafka_reqs)
+ kafka = callbacks.Kafka(dist_config)
+ manager = charmframework.Manager([
+ {
+ 'name': 'kafka',
+ 'provides': [
+ jujubigdata.relations.Kafka(port=dist_config.port('kafka'))
+ ],
+ 'requires': [
+ kafka.verify_resources,
+ jujubigdata.relations.Zookeeper(),
+ callbacks.update_blocked_status, # not really a requirement, but best way to fit into framework
+ ],
+ 'callbacks': [
+ callbacks.update_working_status,
+ kafka.install,
+ kafka.configure_kafka,
+ kafka.restart,
+ charmframework.helpers.open_ports(
+ dist_config.exposed_ports('kafka')),
+ callbacks.update_active_status,
+ ],
+ 'cleanup': [
+ callbacks.clear_active_flag,
+ charmframework.helpers.close_ports(
+ dist_config.exposed_ports('kafka')),
+ kafka.stop,
+ kafka.cleanup,
+ callbacks.update_blocked_status,
+ ],
+ },
+ ])
+ manager.manage()
+
+
+if __name__ == '__main__':
+ manage()
diff --git a/charms/trusty/kafka/hooks/config-changed b/charms/trusty/kafka/hooks/config-changed
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/config-changed
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/install b/charms/trusty/kafka/hooks/install
new file mode 100755
index 0000000..7ea6d0f
--- /dev/null
+++ b/charms/trusty/kafka/hooks/install
@@ -0,0 +1,17 @@
+#!/usr/bin/python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import setup
+setup.pre_install()
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/kafka-relation-changed b/charms/trusty/kafka/hooks/kafka-relation-changed
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/kafka-relation-changed
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/setup.py b/charms/trusty/kafka/hooks/setup.py
new file mode 100644
index 0000000..496edc6
--- /dev/null
+++ b/charms/trusty/kafka/hooks/setup.py
@@ -0,0 +1,33 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import subprocess
+from glob import glob
+
+
+def pre_install():
+ """
+ Do any setup required before the install hook.
+ """
+ install_pip()
+ install_bundled_resources()
+
+
+def install_pip():
+ subprocess.check_call(['apt-get', 'install', '-yq', 'python-pip', 'bzr'])
+
+
+def install_bundled_resources():
+ """
+ Install the bundled resources libraries.
+ """
+ archives = glob('resources/python/*')
+ subprocess.check_call(['pip', 'install'] + archives)
diff --git a/charms/trusty/kafka/hooks/start b/charms/trusty/kafka/hooks/start
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/start
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/stop b/charms/trusty/kafka/hooks/stop
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/stop
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/zookeeper-relation-changed b/charms/trusty/kafka/hooks/zookeeper-relation-changed
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/zookeeper-relation-changed
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()
diff --git a/charms/trusty/kafka/hooks/zookeeper-relation-departed b/charms/trusty/kafka/hooks/zookeeper-relation-departed
new file mode 100755
index 0000000..1b4e92c
--- /dev/null
+++ b/charms/trusty/kafka/hooks/zookeeper-relation-departed
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import common
+common.manage()