diff options
Diffstat (limited to 'charms/trusty/kafka/actions')
-rwxr-xr-x | charms/trusty/kafka/actions/create-topic | 40 | ||||
-rwxr-xr-x | charms/trusty/kafka/actions/delete-topic | 36 | ||||
-rwxr-xr-x | charms/trusty/kafka/actions/list-topics | 31 | ||||
-rwxr-xr-x | charms/trusty/kafka/actions/list-zks | 28 | ||||
-rwxr-xr-x | charms/trusty/kafka/actions/read-topic | 35 | ||||
-rwxr-xr-x | charms/trusty/kafka/actions/write-topic | 36 |
6 files changed, 206 insertions, 0 deletions
diff --git a/charms/trusty/kafka/actions/create-topic b/charms/trusty/kafka/actions/create-topic new file mode 100755 index 0000000..4910430 --- /dev/null +++ b/charms/trusty/kafka/actions/create-topic @@ -0,0 +1,40 @@ +#!/usr/bin/env python +import sys + +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + from jujubigdata import utils + from jujubigdata.relations import Zookeeper + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + +# Grab the business +topic_name = hookenv.action_get('topic') +topic_partitions = hookenv.action_get('partitions') +topic_replication = hookenv.action_get('replication') + +# Create the topic if we've got zookeepers; otherwise fail. +if Zookeeper().connected_units() and Zookeeper().is_ready(): + zks = [] + for unit, data in Zookeeper().filtered_data().items(): + ip = utils.resolve_private_address(data['private-address']) + zks.append("%s:%s" % (ip, data['port'])) + zks.sort() + zookeepers = ",".join(zks) + output = utils.run_as('kafka', 'kafka-topics.sh', + '--zookeeper', zookeepers, '--create', + '--topic', topic_name, + '--partitions', topic_partitions, + '--replication-factor', topic_replication, + capture_output=True) + hookenv.action_set({'output': output}) +else: + hookenv.action_fail('Zookeeper relation is not present/ready') + sys.exit() diff --git a/charms/trusty/kafka/actions/delete-topic b/charms/trusty/kafka/actions/delete-topic new file mode 100755 index 0000000..b56f004 --- /dev/null +++ b/charms/trusty/kafka/actions/delete-topic @@ -0,0 +1,36 @@ +#!/usr/bin/env python +import sys + +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + from jujubigdata import utils + from jujubigdata.relations import Zookeeper + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + +# Grab the business +topic_name = hookenv.action_get('topic') + +# Delete the topic if we've got zookeepers; otherwise fail. +if Zookeeper().connected_units() and Zookeeper().is_ready(): + zks = [] + for unit, data in Zookeeper().filtered_data().items(): + ip = utils.resolve_private_address(data['private-address']) + zks.append("%s:%s" % (ip, data['port'])) + zks.sort() + zookeepers = ",".join(zks) + output = utils.run_as('kafka', 'kafka-topics.sh', + '--zookeeper', zookeepers, '--delete', + '--topic', topic_name, + capture_output=True) + hookenv.action_set({'output': output}) +else: + hookenv.action_fail('Zookeeper relation is not present/ready') + sys.exit() diff --git a/charms/trusty/kafka/actions/list-topics b/charms/trusty/kafka/actions/list-topics new file mode 100755 index 0000000..629d2b4 --- /dev/null +++ b/charms/trusty/kafka/actions/list-topics @@ -0,0 +1,31 @@ +#!/usr/bin/env python +import sys + +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + from jujubigdata import utils + from jujubigdata.relations import Zookeeper + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + +if Zookeeper().connected_units() and Zookeeper().is_ready(): + zks = [] + for unit, data in Zookeeper().filtered_data().items(): + ip = utils.resolve_private_address(data['private-address']) + zks.append("%s:%s" % (ip, data['port'])) + zks.sort() + zookeepers = ",".join(zks) + output = utils.run_as('kafka', '/usr/lib/kafka/bin/kafka-topics.sh', + '--zookeeper', zookeepers, '--list', + capture_output=True) + hookenv.action_set({'topics': output}) +else: + hookenv.action_fail('Zookeeper relation is not present/ready') + sys.exit() diff --git a/charms/trusty/kafka/actions/list-zks b/charms/trusty/kafka/actions/list-zks new file mode 100755 index 0000000..9de9e9a --- /dev/null +++ b/charms/trusty/kafka/actions/list-zks @@ -0,0 +1,28 @@ +#!/usr/bin/env python +import sys + +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + from jujubigdata import utils + from jujubigdata.relations import Zookeeper + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + +if Zookeeper().connected_units() and Zookeeper().is_ready(): + zks = [] + for unit, data in Zookeeper().filtered_data().items(): + ip = utils.resolve_private_address(data['private-address']) + zks.append("%s:%s" % (ip, data['port'])) + zks.sort() + zookeepers = ",".join(zks) + hookenv.action_set({'zookeepers': zookeepers}) +else: + hookenv.action_fail('Zookeeper relation is not present/ready') + sys.exit() diff --git a/charms/trusty/kafka/actions/read-topic b/charms/trusty/kafka/actions/read-topic new file mode 100755 index 0000000..9f59396 --- /dev/null +++ b/charms/trusty/kafka/actions/read-topic @@ -0,0 +1,35 @@ +#!/usr/bin/env python +#pylint: disable=C0103 +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + import jujubigdata + from jujubigdata import utils + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + +kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports'] +dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml', + required_keys=kafka_reqs) + +# Grab the business +topic_name = hookenv.action_get('topic') +topic_partition = hookenv.action_get('partition') + +output = utils.run_as( + 'kafka', 'kafka-simple-consumer-shell.sh', + '--broker-list', '{}:{}'.format( + hookenv.unit_private_ip(), + dist_config.port('kafka'), + ), + '--topic', topic_name, + '--partition', topic_partition, + '--no-wait-at-logend', + capture_output=True) +hookenv.action_set({'output': output}) diff --git a/charms/trusty/kafka/actions/write-topic b/charms/trusty/kafka/actions/write-topic new file mode 100755 index 0000000..fce8e1b --- /dev/null +++ b/charms/trusty/kafka/actions/write-topic @@ -0,0 +1,36 @@ +#!/usr/bin/env python +#pylint: disable=C0103 + +try: + from charmhelpers.core import hookenv + from charmhelpers.core import unitdata + import jujubigdata + from jujubigdata import utils + charm_ready = unitdata.kv().get('charm.active', False) +except ImportError: + charm_ready = False + +if not charm_ready: + # might not have hookenv.action_fail available yet + from subprocess import call + call(['action-fail', 'Kafka service not yet ready']) + + +kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports'] +dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml', + required_keys=kafka_reqs) + +# Grab the business +topic_name = hookenv.action_get('topic') +data = hookenv.action_get('data') + +output = utils.run_as( + 'kafka', 'kafka-console-producer.sh', + '--broker-list', '{}:{}'.format( + hookenv.unit_private_ip(), + dist_config.port('kafka'), + ), + '--topic', topic_name, + capture_output=True, + input=data) +hookenv.action_set({'output': output}) |