#!/usr/bin/env python import sys try: from charmhelpers.core import hookenv from charmhelpers.core import unitdata from jujubigdata import utils from jujubigdata.relations import Zookeeper charm_ready = unitdata.kv().get('charm.active', False) except ImportError: charm_ready = False if not charm_ready: # might not have hookenv.action_fail available yet from subprocess import call call(['action-fail', 'Kafka service not yet ready']) # Grab the business topic_name = hookenv.action_get('topic') topic_partitions = hookenv.action_get('partitions') topic_replication = hookenv.action_get('replication') # Create the topic if we've got zookeepers; otherwise fail. if Zookeeper().connected_units() and Zookeeper().is_ready(): zks = [] for unit, data in Zookeeper().filtered_data().items(): ip = utils.resolve_private_address(data['private-address']) zks.append("%s:%s" % (ip, data['port'])) zks.sort() zookeepers = ",".join(zks) output = utils.run_as('kafka', 'kafka-topics.sh', '--zookeeper', zookeepers, '--create', '--topic', topic_name, '--partitions', topic_partitions, '--replication-factor', topic_replication, capture_output=True) hookenv.action_set({'output': output}) else: hookenv.action_fail('Zookeeper relation is not present/ready') sys.exit()