aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/kafka/hooks/callbacks.py
blob: bac68cbde3751a949a48cd9b37994a5170ec646d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
from socket import gaierror, gethostbyname, gethostname
from subprocess import Popen, check_call

import jujuresources
from charmhelpers.core import hookenv, templating
from charmhelpers.core import host
from charmhelpers.core import unitdata
from jujubigdata import utils
from jujubigdata.relations import Zookeeper


# Extended status support
# We call update_blocked_status from the "requires" section of our service
# block, so be sure to return True. Otherwise, we'll block the "requires"
# and never move on to callbacks. The other status update methods are called
# from the "callbacks" section and therefore don't need to return True.
def update_blocked_status():
    if unitdata.kv().get('charm.active', False):
        return True
    if not Zookeeper().connected_units():
        hookenv.status_set('blocked', 'Waiting for relation to apache-zookeeper')
    elif not Zookeeper().is_ready():
        hookenv.status_set('waiting', 'Waiting for Zookeeper to become ready')
    return True


def update_working_status():
    if unitdata.kv().get('charm.active', False):
        hookenv.status_set('maintenance', 'Updating configuration')
        return
    hookenv.status_set('maintenance', 'Setting up Apache Kafka')


def update_active_status():
    unitdata.kv().set('charm.active', True)
    hookenv.status_set('active', 'Ready')


def clear_active_flag():
    unitdata.kv().set('charm.active', False)


# Main Kafka class for callbacks
class Kafka(object):
    def __init__(self, dist_config):
        self.dist_config = dist_config
        self.resources = {
            'kafka': 'kafka-%s' % host.cpu_arch(),
        }
        self.verify_resources = utils.verify_resources(*self.resources.values())

    def fix_hostname(self):
        # ensure hostname is resolvable
        hostname = gethostname()
        try:
            gethostbyname(hostname)
        except gaierror:
            check_call(['sed', '-E', '-i', '-e',
                        '/127.0.0.1[[:blank:]]+/a \\\n127.0.1.1 ' + hostname,
                        '/etc/hosts'])

    def is_installed(self):
        return unitdata.kv().get('kafka.installed')

    def install(self, force=False):
        if not force and self.is_installed():
            return
        self.fix_hostname()
        self.dist_config.add_users()
        self.dist_config.add_dirs()
        self.dist_config.add_packages()
        jujuresources.install(self.resources['kafka'],
                              destination=self.dist_config.path('kafka'),
                              skip_top_level=True)
        self.setup_kafka_config()
        unitdata.kv().set('kafka.installed', True)

    def setup_kafka_config(self):
        '''
        copy the default configuration files to kafka_conf property
        defined in dist.yaml
        '''
        default_conf = self.dist_config.path('kafka') / 'config'
        kafka_conf = self.dist_config.path('kafka_conf')
        kafka_conf.rmtree_p()
        default_conf.copytree(kafka_conf)
        # Now remove the conf included in the tarball and symlink our real conf
        # dir. we've seen issues where kafka still looks for config in
        # KAFKA_HOME/config.
        default_conf.rmtree_p()
        kafka_conf.symlink(default_conf)

        # Configure immutable bits
        kafka_bin = self.dist_config.path('kafka') / 'bin'
        with utils.environment_edit_in_place('/etc/environment') as env:
            if kafka_bin not in env['PATH']:
                env['PATH'] = ':'.join([env['PATH'], kafka_bin])
            env['LOG_DIR'] = self.dist_config.path('kafka_app_logs')

        # note: we set the advertised.host.name below to the public_address
        # to ensure that external (non-Juju) clients can connect to Kafka
        public_address = hookenv.unit_get('public-address')
        private_ip = utils.resolve_private_address(hookenv.unit_get('private-address'))
        kafka_server_conf = self.dist_config.path('kafka_conf') / 'server.properties'
        service, unit_num = os.environ['JUJU_UNIT_NAME'].split('/', 1)
        utils.re_edit_in_place(kafka_server_conf, {
            r'^broker.id=.*': 'broker.id=%s' % unit_num,
            r'^port=.*': 'port=%s' % self.dist_config.port('kafka'),
            r'^log.dirs=.*': 'log.dirs=%s' % self.dist_config.path('kafka_data_logs'),
            r'^#?advertised.host.name=.*': 'advertised.host.name=%s' % public_address,
        })

        kafka_log4j = self.dist_config.path('kafka_conf') / 'log4j.properties'
        utils.re_edit_in_place(kafka_log4j, {
            r'^kafka.logs.dir=.*': 'kafka.logs.dir=%s' % self.dist_config.path('kafka_app_logs'),
        })

        # fix for lxc containers and some corner cases in manual provider
        # ensure that public_address is resolvable internally by mapping it to the private IP
        utils.update_etc_hosts({private_ip: public_address})

        templating.render(
            'upstart.conf',
            '/etc/init/kafka.conf',
            context={
                'kafka_conf': self.dist_config.path('kafka_conf'),
                'kafka_bin': '{}/bin'.format(self.dist_config.path('kafka'))
            },
        )

    def configure_kafka(self):
        # Get ip:port data from our connected zookeepers
        if Zookeeper().connected_units() and Zookeeper().is_ready():
            zks = []
            for unit, data in Zookeeper().filtered_data().items():
                ip = utils.resolve_private_address(data['private-address'])
                zks.append("%s:%s" % (ip, data['port']))
            zks.sort()
            zk_connect = ",".join(zks)

            # update consumer props
            cfg = self.dist_config.path('kafka_conf') / 'consumer.properties'
            utils.re_edit_in_place(cfg, {
                r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
            })

            # update server props
            cfg = self.dist_config.path('kafka_conf') / 'server.properties'
            utils.re_edit_in_place(cfg, {
                r'^zookeeper.connect=.*': 'zookeeper.connect=%s' % zk_connect,
            })
        else:
            # if we have no zookeepers, make sure kafka is stopped
            self.stop()

    def run_bg(self, user, command, *args):
        """
        Run a Kafka command as the `kafka` user in the background.

        :param str command: Command to run
        :param list args: Additional args to pass to the command
        """
        parts = [command] + list(args)
        quoted = ' '.join("'%s'" % p for p in parts)
        e = utils.read_etc_env()
        Popen(['su', user, '-c', quoted], env=e)

    def restart(self):
        self.stop()
        self.start()

    def start(self):
        host.service_start('kafka')

    def stop(self):
        host.service_stop('kafka')

    def cleanup(self):
        self.dist_config.remove_users()
        self.dist_config.remove_dirs()