aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/kafka/actions/create-topic
blob: 49104300d1ecf2aa2d1c56f66c893629f17d5d3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python
import sys

try:
    from charmhelpers.core import hookenv
    from charmhelpers.core import unitdata
    from jujubigdata import utils
    from jujubigdata.relations import Zookeeper
    charm_ready = unitdata.kv().get('charm.active', False)
except ImportError:
    charm_ready = False

if not charm_ready:
    # might not have hookenv.action_fail available yet
    from subprocess import call
    call(['action-fail', 'Kafka service not yet ready'])

# Grab the business
topic_name = hookenv.action_get('topic')
topic_partitions = hookenv.action_get('partitions')
topic_replication = hookenv.action_get('replication')

# Create the topic if we've got zookeepers; otherwise fail.
if Zookeeper().connected_units() and Zookeeper().is_ready():
    zks = []
    for unit, data in Zookeeper().filtered_data().items():
        ip = utils.resolve_private_address(data['private-address'])
        zks.append("%s:%s" % (ip, data['port']))
    zks.sort()
    zookeepers = ",".join(zks)
    output = utils.run_as('kafka', 'kafka-topics.sh',
                          '--zookeeper', zookeepers, '--create',
                          '--topic',  topic_name,
                          '--partitions', topic_partitions,
                          '--replication-factor', topic_replication,
                          capture_output=True)
    hookenv.action_set({'output': output})
else:
    hookenv.action_fail('Zookeeper relation is not present/ready')
    sys.exit()