aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/kafka/actions
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/kafka/actions')
-rwxr-xr-xcharms/trusty/kafka/actions/create-topic40
-rwxr-xr-xcharms/trusty/kafka/actions/delete-topic36
-rwxr-xr-xcharms/trusty/kafka/actions/list-topics31
-rwxr-xr-xcharms/trusty/kafka/actions/list-zks28
-rwxr-xr-xcharms/trusty/kafka/actions/read-topic35
-rwxr-xr-xcharms/trusty/kafka/actions/write-topic36
6 files changed, 206 insertions, 0 deletions
diff --git a/charms/trusty/kafka/actions/create-topic b/charms/trusty/kafka/actions/create-topic
new file mode 100755
index 0000000..4910430
--- /dev/null
+++ b/charms/trusty/kafka/actions/create-topic
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+import sys
+
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ from jujubigdata import utils
+ from jujubigdata.relations import Zookeeper
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+# Grab the business
+topic_name = hookenv.action_get('topic')
+topic_partitions = hookenv.action_get('partitions')
+topic_replication = hookenv.action_get('replication')
+
+# Create the topic if we've got zookeepers; otherwise fail.
+if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zookeepers = ",".join(zks)
+ output = utils.run_as('kafka', 'kafka-topics.sh',
+ '--zookeeper', zookeepers, '--create',
+ '--topic', topic_name,
+ '--partitions', topic_partitions,
+ '--replication-factor', topic_replication,
+ capture_output=True)
+ hookenv.action_set({'output': output})
+else:
+ hookenv.action_fail('Zookeeper relation is not present/ready')
+ sys.exit()
diff --git a/charms/trusty/kafka/actions/delete-topic b/charms/trusty/kafka/actions/delete-topic
new file mode 100755
index 0000000..b56f004
--- /dev/null
+++ b/charms/trusty/kafka/actions/delete-topic
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import sys
+
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ from jujubigdata import utils
+ from jujubigdata.relations import Zookeeper
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+# Grab the business
+topic_name = hookenv.action_get('topic')
+
+# Delete the topic if we've got zookeepers; otherwise fail.
+if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zookeepers = ",".join(zks)
+ output = utils.run_as('kafka', 'kafka-topics.sh',
+ '--zookeeper', zookeepers, '--delete',
+ '--topic', topic_name,
+ capture_output=True)
+ hookenv.action_set({'output': output})
+else:
+ hookenv.action_fail('Zookeeper relation is not present/ready')
+ sys.exit()
diff --git a/charms/trusty/kafka/actions/list-topics b/charms/trusty/kafka/actions/list-topics
new file mode 100755
index 0000000..629d2b4
--- /dev/null
+++ b/charms/trusty/kafka/actions/list-topics
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+import sys
+
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ from jujubigdata import utils
+ from jujubigdata.relations import Zookeeper
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zookeepers = ",".join(zks)
+ output = utils.run_as('kafka', '/usr/lib/kafka/bin/kafka-topics.sh',
+ '--zookeeper', zookeepers, '--list',
+ capture_output=True)
+ hookenv.action_set({'topics': output})
+else:
+ hookenv.action_fail('Zookeeper relation is not present/ready')
+ sys.exit()
diff --git a/charms/trusty/kafka/actions/list-zks b/charms/trusty/kafka/actions/list-zks
new file mode 100755
index 0000000..9de9e9a
--- /dev/null
+++ b/charms/trusty/kafka/actions/list-zks
@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+import sys
+
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ from jujubigdata import utils
+ from jujubigdata.relations import Zookeeper
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+if Zookeeper().connected_units() and Zookeeper().is_ready():
+ zks = []
+ for unit, data in Zookeeper().filtered_data().items():
+ ip = utils.resolve_private_address(data['private-address'])
+ zks.append("%s:%s" % (ip, data['port']))
+ zks.sort()
+ zookeepers = ",".join(zks)
+ hookenv.action_set({'zookeepers': zookeepers})
+else:
+ hookenv.action_fail('Zookeeper relation is not present/ready')
+ sys.exit()
diff --git a/charms/trusty/kafka/actions/read-topic b/charms/trusty/kafka/actions/read-topic
new file mode 100755
index 0000000..9f59396
--- /dev/null
+++ b/charms/trusty/kafka/actions/read-topic
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+#pylint: disable=C0103
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ import jujubigdata
+ from jujubigdata import utils
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports']
+dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml',
+ required_keys=kafka_reqs)
+
+# Grab the business
+topic_name = hookenv.action_get('topic')
+topic_partition = hookenv.action_get('partition')
+
+output = utils.run_as(
+ 'kafka', 'kafka-simple-consumer-shell.sh',
+ '--broker-list', '{}:{}'.format(
+ hookenv.unit_private_ip(),
+ dist_config.port('kafka'),
+ ),
+ '--topic', topic_name,
+ '--partition', topic_partition,
+ '--no-wait-at-logend',
+ capture_output=True)
+hookenv.action_set({'output': output})
diff --git a/charms/trusty/kafka/actions/write-topic b/charms/trusty/kafka/actions/write-topic
new file mode 100755
index 0000000..fce8e1b
--- /dev/null
+++ b/charms/trusty/kafka/actions/write-topic
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+#pylint: disable=C0103
+
+try:
+ from charmhelpers.core import hookenv
+ from charmhelpers.core import unitdata
+ import jujubigdata
+ from jujubigdata import utils
+ charm_ready = unitdata.kv().get('charm.active', False)
+except ImportError:
+ charm_ready = False
+
+if not charm_ready:
+ # might not have hookenv.action_fail available yet
+ from subprocess import call
+ call(['action-fail', 'Kafka service not yet ready'])
+
+
+kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports']
+dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml',
+ required_keys=kafka_reqs)
+
+# Grab the business
+topic_name = hookenv.action_get('topic')
+data = hookenv.action_get('data')
+
+output = utils.run_as(
+ 'kafka', 'kafka-console-producer.sh',
+ '--broker-list', '{}:{}'.format(
+ hookenv.unit_private_ip(),
+ dist_config.port('kafka'),
+ ),
+ '--topic', topic_name,
+ capture_output=True,
+ input=data)
+hookenv.action_set({'output': output})