1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#!/usr/bin/env python
#pylint: disable=C0103
try:
from charmhelpers.core import hookenv
from charmhelpers.core import unitdata
import jujubigdata
from jujubigdata import utils
charm_ready = unitdata.kv().get('charm.active', False)
except ImportError:
charm_ready = False
if not charm_ready:
# might not have hookenv.action_fail available yet
from subprocess import call
call(['action-fail', 'Kafka service not yet ready'])
kafka_reqs = ['vendor', 'packages', 'groups', 'users', 'dirs', 'ports']
dist_config = jujubigdata.utils.DistConfig(filename='dist.yaml',
required_keys=kafka_reqs)
# Grab the business
topic_name = hookenv.action_get('topic')
topic_partition = hookenv.action_get('partition')
output = utils.run_as(
'kafka', 'kafka-simple-consumer-shell.sh',
'--broker-list', '{}:{}'.format(
hookenv.unit_private_ip(),
dist_config.port('kafka'),
),
'--topic', topic_name,
'--partition', topic_partition,
'--no-wait-at-logend',
capture_output=True)
hookenv.action_set({'output': output})
|