aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/cassandra/tests
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/cassandra/tests')
-rw-r--r--charms/trusty/cassandra/tests/__init__.py15
-rwxr-xr-xcharms/trusty/cassandra/tests/base.py43
-rwxr-xr-xcharms/trusty/cassandra/tests/test_actions.py1156
-rwxr-xr-xcharms/trusty/cassandra/tests/test_definitions.py104
-rwxr-xr-xcharms/trusty/cassandra/tests/test_helpers.py1466
-rwxr-xr-xcharms/trusty/cassandra/tests/test_integration.py620
-rw-r--r--charms/trusty/cassandra/tests/tests.yaml15
7 files changed, 3419 insertions, 0 deletions
diff --git a/charms/trusty/cassandra/tests/__init__.py b/charms/trusty/cassandra/tests/__init__.py
new file mode 100644
index 0000000..b1b7fcd
--- /dev/null
+++ b/charms/trusty/cassandra/tests/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/charms/trusty/cassandra/tests/base.py b/charms/trusty/cassandra/tests/base.py
new file mode 100755
index 0000000..d308985
--- /dev/null
+++ b/charms/trusty/cassandra/tests/base.py
@@ -0,0 +1,43 @@
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import functools
+from itertools import count
+import unittest
+from unittest.mock import patch
+
+from testing.mocks import mock_charmhelpers
+
+patch = functools.partial(patch, autospec=True) # autospec by default.
+
+
+class TestCaseBase(unittest.TestCase):
+ def setUp(self):
+ super(TestCaseBase, self).setUp()
+
+ mock_charmhelpers(self)
+
+ is_lxc = patch('helpers.is_lxc', return_value=False)
+ is_lxc.start()
+ self.addCleanup(is_lxc.stop)
+
+ emit = patch('helpers.emit')
+ emit.start()
+ self.addCleanup(emit.stop)
+
+ time = patch('time.time', side_effect=count(1))
+ time.start()
+ self.addCleanup(time.stop)
diff --git a/charms/trusty/cassandra/tests/test_actions.py b/charms/trusty/cassandra/tests/test_actions.py
new file mode 100755
index 0000000..f97df0c
--- /dev/null
+++ b/charms/trusty/cassandra/tests/test_actions.py
@@ -0,0 +1,1156 @@
+#!.venv3/bin/python3
+
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import errno
+from itertools import repeat
+import os.path
+import re
+import shutil
+import subprocess
+import tempfile
+from textwrap import dedent
+import unittest
+from unittest.mock import ANY, call, patch, sentinel
+import yaml
+
+import cassandra
+from charmhelpers.core import hookenv
+
+from tests.base import TestCaseBase
+import actions
+from coordinator import coordinator
+import helpers
+
+
+class TestActions(TestCaseBase):
+ def test_action_wrapper(self):
+ @actions.action
+ def somefunc(*args, **kw):
+ return 42, args, kw
+
+ hookenv.hook_name.return_value = 'catch-fire'
+
+ # The wrapper stripts the servicename argument, which we have no
+ # use for, logs a message and invokes the wrapped function.
+ hookenv.remote_unit.return_value = None
+ self.assertEqual(somefunc('sn', 1, foo=4), (42, (1,), dict(foo=4)))
+ hookenv.log.assert_called_once_with('** Action catch-fire/somefunc')
+
+ # Different log message if there is a remote unit.
+ hookenv.log.reset_mock()
+ os.environ['JUJU_REMOTE_UNIT'] = 'foo'
+ self.assertEqual(somefunc('sn', 1, foo=4), (42, (1,), dict(foo=4)))
+ hookenv.log.assert_called_once_with(
+ '** Action catch-fire/somefunc (foo)')
+
+ def test_revert_unchangeable_config(self):
+ config = hookenv.config()
+
+ self.assertIn('datacenter', actions.UNCHANGEABLE_KEYS)
+
+ # In the first hook, revert does nothing as there is nothing to
+ # revert too.
+ config['datacenter'] = 'mission_control'
+ self.assertTrue(config.changed('datacenter'))
+ actions.revert_unchangeable_config('')
+ self.assertEqual(config['datacenter'], 'mission_control')
+
+ config.save()
+ config.load_previous()
+ config['datacenter'] = 'orbital_1'
+
+ actions.revert_unchangeable_config('')
+ self.assertEqual(config['datacenter'], 'mission_control') # Reverted
+
+ hookenv.log.assert_any_call(ANY, hookenv.ERROR) # Logged the problem.
+
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_leader_only(self, is_leader):
+
+ @actions.leader_only
+ def f(*args, **kw):
+ return args, kw
+
+ is_leader.return_value = False
+ self.assertIsNone(f(1, foo='bar'))
+
+ is_leader.return_value = True
+ self.assertEqual(f(1, foo='bar'), ((1,), dict(foo='bar')))
+
+ def test_set_proxy(self):
+ # NB. Environment is already mocked.
+ os.environ['http_proxy'] = ''
+ os.environ['https_proxy'] = ''
+ actions.set_proxy('')
+ self.assertEqual(os.environ['http_proxy'], '')
+ self.assertEqual(os.environ['https_proxy'], '')
+ hookenv.config()['http_proxy'] = 'foo'
+ actions.set_proxy('')
+ self.assertEqual(os.environ['http_proxy'], 'foo')
+ self.assertEqual(os.environ['https_proxy'], 'foo')
+
+ @patch('subprocess.check_call')
+ def test_preinstall(self, check_call):
+ # Noop if there are no preinstall hooks found running the
+ # install hook.
+ hookenv.hook_name.return_value = 'install'
+ actions.preinstall('')
+ self.assertFalse(check_call.called)
+ hookenv.log.assert_any_call('No preinstall hooks found')
+
+ # If preinstall hooks are found running the install hook,
+ # the preinstall hooks are run.
+ hook_dirs = []
+ hook_files = []
+ for i in range(1, 3):
+ hook_dirs.append(os.path.join(hookenv.charm_dir(),
+ 'exec.d', str(i)))
+ hook_files.append(os.path.join(hook_dirs[-1], 'charm-pre-install'))
+
+ os.makedirs(hook_dirs[-1])
+ with open(hook_files[-1], 'w') as f1:
+ print('mocked', file=f1)
+ os.chmod(hook_files[-1], 0o755)
+
+ check_call.reset_mock()
+ actions.preinstall('')
+
+ calls = [call(['sh', '-c', f2]) for f2 in hook_files]
+ check_call.assert_has_calls(calls)
+
+ # If a preinstall hook is not executable, a warning is raised.
+ hook_dir = os.path.join(hookenv.charm_dir(), 'exec.d', '55')
+ hook_file = os.path.join(hook_dir, 'charm-pre-install')
+ os.makedirs(hook_dir)
+ with open(hook_file, 'w') as f1:
+ print('whoops', file=f1)
+ os.chmod(hook_file, 0o644)
+ check_call.reset_mock()
+ hookenv.log.reset_mock()
+ actions.preinstall('')
+ check_call.assert_has_calls(calls) # Only previous hooks run.
+ hookenv.log.assert_has_calls([
+ call(ANY),
+ call(ANY),
+ call(ANY, hookenv.WARNING)])
+
+ # Nothing happens if the install hook is not being run.
+ hookenv.hook_name.return_value = 'config-changed'
+ check_call.reset_mock()
+ actions.preinstall('')
+ self.assertFalse(check_call.called)
+
+ @patch('subprocess.check_call')
+ def test_swapoff(self, check_call):
+ fstab = (
+ b'UUID=abc / ext4 errors=remount-ro 0 1\n'
+ b'/dev/mapper/cryptswap1 none swap sw 0 0')
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(fstab)
+ f.flush()
+ actions.swapoff('', f.name)
+ f.seek(0)
+ self.assertTrue(b'swap' not in f.read())
+
+ check_call.assert_called_once_with(['swapoff', '-a'])
+
+ @patch('subprocess.check_call')
+ def test_swapoff_fails(self, check_call):
+ check_call.side_effect = RuntimeError()
+ actions.swapoff('', '')
+ # A warning is generated if swapoff fails.
+ hookenv.log.assert_any_call(ANY, hookenv.WARNING)
+
+ @patch('subprocess.check_call')
+ def test_swapoff_lxc(self, check_call):
+ # Under LXC, the swapoff action does nothing except log.
+ helpers.is_lxc.return_value = True
+ actions.swapoff('')
+ self.assertFalse(check_call.called)
+
+ @patch('charmhelpers.fetch.configure_sources')
+ def test_configure_sources(self, configure_sources):
+ config = hookenv.config()
+
+ # fetch.configure_sources called the first time
+ actions.configure_sources('')
+ configure_sources.assert_called_once_with(True)
+
+ # fetch.configure_sources not called if relevant config is unchanged.
+ config.save()
+ config.load_previous()
+ configure_sources.reset_mock()
+ actions.configure_sources('')
+ self.assertFalse(configure_sources.called)
+
+ # Changing install_sources causes fetch.configure_sources to be
+ # called.
+ config.save()
+ config.load_previous()
+ configure_sources.reset_mock()
+ config['install_sources'] = 'foo'
+ actions.configure_sources('')
+ configure_sources.assert_called_once_with(True)
+
+ # Changing install_keys causes fetch.configure_sources to be
+ # called.
+ config.save()
+ config.load_previous()
+ configure_sources.reset_mock()
+ config['install_keys'] = 'foo'
+ actions.configure_sources('')
+ configure_sources.assert_called_once_with(True)
+
+ @patch('charmhelpers.core.hookenv.charm_dir')
+ @patch('subprocess.check_call')
+ def test_add_implicit_package_signing_keys(self, check_call, charm_dir):
+ charm_dir.return_value = os.path.join(os.path.dirname(__file__),
+ os.pardir)
+ actions.add_implicit_package_signing_keys('')
+
+ keys = ['apache', 'datastax']
+
+ self.assertEqual(check_call.call_count, len(keys))
+
+ for k in keys:
+ with self.subTest(key=k):
+ path = os.path.join(hookenv.charm_dir(),
+ 'lib', '{}.key'.format(k))
+ self.assertTrue(os.path.exists(path))
+ check_call.assert_any_call(['apt-key', 'add', path],
+ stdin=subprocess.DEVNULL)
+
+ @patch('charmhelpers.core.host.write_file')
+ @patch('subprocess.check_call')
+ def test_reset_sysctl(self, check_call, write_file):
+ actions.reset_sysctl('')
+
+ ctl_file = '/etc/sysctl.d/99-cassandra.conf'
+ # Magic value per Cassandra best practice.
+ write_file.assert_called_once_with(ctl_file,
+ b"vm.max_map_count = 131072\n")
+ check_call.assert_called_once_with(['sysctl', '-p',
+ '/etc/sysctl.d/99-cassandra.conf'])
+
+ @patch('subprocess.check_call')
+ @patch('charmhelpers.core.host.write_file')
+ def test_reset_sysctl_expected_fails(self, write_file, check_call):
+ check_call.side_effect = repeat(OSError(errno.EACCES,
+ 'Permission Denied'))
+ actions.reset_sysctl('')
+ # A warning is generated if permission denied was raised.
+ hookenv.log.assert_any_call(ANY, hookenv.WARNING)
+
+ @patch('subprocess.check_call')
+ @patch('charmhelpers.core.host.write_file')
+ def test_reset_sysctl_fails_badly(self, write_file, check_call):
+ # Other OSErrors are reraised since we don't know how to handle
+ # them.
+ check_call.side_effect = repeat(OSError(errno.EFAULT, 'Whoops'))
+ self.assertRaises(OSError, actions.reset_sysctl, '')
+
+ @patch('subprocess.check_call')
+ def test_reset_sysctl_lxc(self, check_call):
+ helpers.is_lxc.return_value = True
+ actions.reset_sysctl('')
+ self.assertFalse(check_call.called)
+ hookenv.log.assert_any_call('In an LXC. '
+ 'Leaving sysctl unchanged.')
+
+ @patch('helpers.get_cassandra_packages')
+ @patch('helpers.ensure_package_status')
+ def test_ensure_cassandra_package_status(self, ensure_package_status,
+ get_cassandra_packages):
+ get_cassandra_packages.return_value = sentinel.cassandra_packages
+ actions.ensure_cassandra_package_status('')
+ ensure_package_status.assert_called_once_with(
+ sentinel.cassandra_packages)
+
+ @patch('subprocess.check_call')
+ @patch('helpers.get_jre')
+ @patch('helpers.get_cassandra_packages')
+ @patch('helpers.install_packages')
+ def test_install_cassandra_packages(self, install_packages,
+ get_cassandra_packages,
+ get_jre, check_call):
+ get_cassandra_packages.return_value = sentinel.cassandra_packages
+ get_jre.return_value = 'openjdk'
+ actions.install_cassandra_packages('')
+ install_packages.assert_called_once_with(sentinel.cassandra_packages)
+ check_call.assert_called_once_with(['update-java-alternatives',
+ '--jre-headless', '--set',
+ 'java-1.8.0-openjdk-amd64'])
+
+ @patch('subprocess.check_call')
+ @patch('helpers.get_jre')
+ @patch('helpers.get_cassandra_packages')
+ @patch('helpers.install_packages')
+ def test_install_cassandra_packages_oracle(self, install_packages,
+ get_cassandra_packages,
+ get_jre, check_call):
+ get_cassandra_packages.return_value = sentinel.cassandra_packages
+ get_jre.return_value = 'oracle'
+ actions.install_cassandra_packages('')
+ install_packages.assert_called_once_with(sentinel.cassandra_packages)
+ # No alternatives selected, as the Oracle JRE installer method
+ # handles this.
+ self.assertFalse(check_call.called)
+
+ @patch('actions._install_oracle_jre_tarball')
+ @patch('actions._fetch_oracle_jre')
+ def test_install_oracle_jre(self, fetch, install_tarball):
+ fetch.return_value = sentinel.tarball
+
+ actions.install_oracle_jre('')
+ self.assertFalse(fetch.called)
+ self.assertFalse(install_tarball.called)
+
+ hookenv.config()['jre'] = 'oracle'
+ actions.install_oracle_jre('')
+ fetch.assert_called_once_with()
+ install_tarball.assert_called_once_with(sentinel.tarball)
+
+ @patch('helpers.status_set')
+ @patch('urllib.request')
+ def test_fetch_oracle_jre(self, req, status_set):
+ config = hookenv.config()
+ url = 'https://foo.example.com/server-jre-7u42-linux-x64.tar.gz'
+ expected_tarball = os.path.join(hookenv.charm_dir(), 'lib',
+ 'server-jre-7u42-linux-x64.tar.gz')
+ config['private_jre_url'] = url
+
+ # Create a dummy tarball, since the mock urlretrieve won't.
+ os.makedirs(os.path.dirname(expected_tarball))
+ with open(expected_tarball, 'w'):
+ pass # Empty file
+
+ self.assertEqual(actions._fetch_oracle_jre(), expected_tarball)
+ req.urlretrieve.assert_called_once_with(url, expected_tarball)
+
+ def test_fetch_oracle_jre_local(self):
+ # Create an existing tarball. If it is found, it will be used
+ # without needing to specify a remote url or actually download
+ # anything.
+ expected_tarball = os.path.join(hookenv.charm_dir(), 'lib',
+ 'server-jre-7u42-linux-x64.tar.gz')
+ os.makedirs(os.path.dirname(expected_tarball))
+ with open(expected_tarball, 'w'):
+ pass # Empty file
+
+ self.assertEqual(actions._fetch_oracle_jre(), expected_tarball)
+
+ @patch('helpers.status_set')
+ def test_fetch_oracle_jre_notfound(self, status_set):
+ with self.assertRaises(SystemExit) as x:
+ actions._fetch_oracle_jre()
+ self.assertEqual(x.code, 0)
+ status_set.assert_called_once_with('blocked', ANY)
+
+ @patch('subprocess.check_call')
+ @patch('charmhelpers.core.host.mkdir')
+ @patch('os.path.isdir')
+ def test_install_oracle_jre_tarball(self, isdir, mkdir, check_call):
+ isdir.return_value = False
+
+ dest = '/usr/lib/jvm/java-8-oracle'
+
+ actions._install_oracle_jre_tarball(sentinel.tarball)
+ mkdir.assert_called_once_with(dest)
+ check_call.assert_has_calls([
+ call(['tar', '-xz', '-C', dest,
+ '--strip-components=1', '-f', sentinel.tarball]),
+ call(['update-alternatives', '--install',
+ '/usr/bin/java', 'java',
+ os.path.join(dest, 'bin', 'java'), '1']),
+ call(['update-alternatives', '--set', 'java',
+ os.path.join(dest, 'bin', 'java')]),
+ call(['update-alternatives', '--install',
+ '/usr/bin/javac', 'javac',
+ os.path.join(dest, 'bin', 'javac'), '1']),
+ call(['update-alternatives', '--set', 'javac',
+ os.path.join(dest, 'bin', 'javac')])])
+
+ @patch('os.path.exists')
+ @patch('subprocess.check_call')
+ @patch('charmhelpers.core.host.mkdir')
+ @patch('os.path.isdir')
+ def test_install_oracle_jre_tarball_already(self, isdir,
+ mkdir, check_call, exists):
+ isdir.return_value = True
+ exists.return_value = True # jre already installed
+
+ # Store the version previously installed.
+ hookenv.config()['oracle_jre_tarball'] = sentinel.tarball
+
+ dest = '/usr/lib/jvm/java-8-oracle'
+
+ actions._install_oracle_jre_tarball(sentinel.tarball)
+
+ self.assertFalse(mkdir.called) # The jvm dir already existed.
+
+ exists.assert_called_once_with('/usr/lib/jvm/java-8-oracle/bin/java')
+
+ # update-alternatives done, but tarball not extracted.
+ check_call.assert_has_calls([
+ call(['update-alternatives', '--install',
+ '/usr/bin/java', 'java',
+ os.path.join(dest, 'bin', 'java'), '1']),
+ call(['update-alternatives', '--set', 'java',
+ os.path.join(dest, 'bin', 'java')]),
+ call(['update-alternatives', '--install',
+ '/usr/bin/javac', 'javac',
+ os.path.join(dest, 'bin', 'javac'), '1']),
+ call(['update-alternatives', '--set', 'javac',
+ os.path.join(dest, 'bin', 'javac')])])
+
+ @patch('subprocess.check_output')
+ def test_emit_java_version(self, check_output):
+ check_output.return_value = 'Line 1\nLine 2'
+ actions.emit_java_version('')
+ check_output.assert_called_once_with(['java', '-version'],
+ universal_newlines=True)
+ hookenv.log.assert_has_calls([call(ANY),
+ call('JRE: Line 1'),
+ call('JRE: Line 2')])
+
+ @patch('helpers.configure_cassandra_yaml')
+ def test_configure_cassandra_yaml(self, configure_cassandra_yaml):
+ # actions.configure_cassandra_yaml is just a wrapper around the
+ # helper.
+ actions.configure_cassandra_yaml('')
+ configure_cassandra_yaml.assert_called_once_with()
+
+ @patch('helpers.get_cassandra_env_file')
+ @patch('charmhelpers.core.host.write_file')
+ def test_configure_cassandra_env(self, write_file, env_file):
+ def _wf(path, contents, perms=None):
+ with open(path, 'wb') as f:
+ f.write(contents)
+ write_file.side_effect = _wf
+
+ # cassandra-env.sh is a shell script that unfortunately
+ # embeds configuration we need to change.
+ existing_config = dedent('''\
+ Everything is ignored
+ unless a regexp matches
+ #MAX_HEAP_SIZE="1G"
+ #HEAP_NEWSIZE="800M"
+ #JMX_PORT="1234"
+ And done
+ ''')
+
+ with tempfile.TemporaryDirectory() as tempdir:
+ cassandra_env = os.path.join(tempdir, 'c.sh')
+ env_file.return_value = cassandra_env
+
+ with open(cassandra_env, 'w', encoding='UTF-8') as f:
+ f.write(existing_config)
+
+ overrides = dict(
+ max_heap_size=re.compile('^MAX_HEAP_SIZE=(.*)$', re.M),
+ heap_newsize=re.compile('^HEAP_NEWSIZE=(.*)$', re.M))
+
+ for key in overrides:
+ hookenv.config()[key] = ''
+
+ # By default, the settings will be commented out.
+ actions.configure_cassandra_env('')
+ with open(cassandra_env, 'r', encoding='UTF-8') as f:
+ generated_env = f.read()
+ for config_key, regexp in overrides.items():
+ with self.subTest(override=config_key):
+ self.assertIsNone(regexp.search(generated_env))
+
+ # Settings can be overridden.
+ for config_key, regexp in overrides.items():
+ hookenv.config()[config_key] = '{} val'.format(config_key)
+ actions.configure_cassandra_env('')
+ with open(cassandra_env, 'r') as f:
+ generated_env = f.read()
+ for config_key, regexp in overrides.items():
+ with self.subTest(override=config_key):
+ match = regexp.search(generated_env)
+ self.assertIsNotNone(match)
+ # Note the value has been shell quoted.
+ self.assertTrue(
+ match.group(1).startswith(
+ "'{} val'".format(config_key)))
+
+ # Settings can be returned to the defaults.
+ for config_key, regexp in overrides.items():
+ hookenv.config()[config_key] = ''
+ actions.configure_cassandra_env('')
+ with open(cassandra_env, 'r', encoding='UTF-8') as f:
+ generated_env = f.read()
+ for config_key, regexp in overrides.items():
+ with self.subTest(override=config_key):
+ self.assertIsNone(regexp.search(generated_env))
+
+ @patch('helpers.get_cassandra_rackdc_file')
+ def test_configure_cassandra_rackdc(self, rackdc_file):
+ hookenv.config()['datacenter'] = 'test_dc'
+ hookenv.config()['rack'] = 'test_rack'
+ with tempfile.NamedTemporaryFile() as rackdc:
+ rackdc_file.return_value = rackdc.name
+ actions.configure_cassandra_rackdc('')
+ with open(rackdc.name, 'r') as f:
+ self.assertEqual(f.read().strip(),
+ 'dc=test_dc\nrack=test_rack')
+
+ @patch('helpers.connect')
+ @patch('helpers.get_auth_keyspace_replication')
+ @patch('helpers.num_nodes')
+ def test_needs_reset_auth_keyspace_replication(self, num_nodes,
+ get_auth_ks_rep,
+ connect):
+ num_nodes.return_value = 4
+ connect().__enter__.return_value = sentinel.session
+ connect().__exit__.return_value = False
+ get_auth_ks_rep.return_value = {'another': '8'}
+ self.assertTrue(actions.needs_reset_auth_keyspace_replication())
+
+ @patch('helpers.connect')
+ @patch('helpers.get_auth_keyspace_replication')
+ @patch('helpers.num_nodes')
+ def test_needs_reset_auth_keyspace_replication_false(self, num_nodes,
+ get_auth_ks_rep,
+ connect):
+ config = hookenv.config()
+ config['datacenter'] = 'mydc'
+ connect().__enter__.return_value = sentinel.session
+ connect().__exit__.return_value = False
+
+ num_nodes.return_value = 3
+ get_auth_ks_rep.return_value = {'another': '8',
+ 'mydc': '3'}
+ self.assertFalse(actions.needs_reset_auth_keyspace_replication())
+
+ @patch('helpers.set_active')
+ @patch('helpers.repair_auth_keyspace')
+ @patch('helpers.connect')
+ @patch('helpers.set_auth_keyspace_replication')
+ @patch('helpers.get_auth_keyspace_replication')
+ @patch('helpers.num_nodes')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_reset_auth_keyspace_replication(self, is_leader, num_nodes,
+ get_auth_ks_rep,
+ set_auth_ks_rep,
+ connect, repair, set_active):
+ is_leader.return_value = True
+ num_nodes.return_value = 4
+ coordinator.grants = {}
+ coordinator.requests = {hookenv.local_unit(): {}}
+ coordinator.grant('repair', hookenv.local_unit())
+ config = hookenv.config()
+ config['datacenter'] = 'mydc'
+ connect().__enter__.return_value = sentinel.session
+ connect().__exit__.return_value = False
+ get_auth_ks_rep.return_value = {'another': '8'}
+ self.assertTrue(actions.needs_reset_auth_keyspace_replication())
+ actions.reset_auth_keyspace_replication('')
+ set_auth_ks_rep.assert_called_once_with(
+ sentinel.session,
+ {'class': 'NetworkTopologyStrategy', 'another': '8', 'mydc': 4})
+ repair.assert_called_once_with()
+ set_active.assert_called_once_with()
+
+ def test_store_unit_private_ip(self):
+ hookenv.unit_private_ip.side_effect = None
+ hookenv.unit_private_ip.return_value = sentinel.ip
+ actions.store_unit_private_ip('')
+ self.assertEqual(hookenv.config()['unit_private_ip'], sentinel.ip)
+
+ @patch('charmhelpers.core.host.service_start')
+ @patch('helpers.status_set')
+ @patch('helpers.actual_seed_ips')
+ @patch('helpers.get_seed_ips')
+ @patch('relations.StorageRelation.needs_remount')
+ @patch('helpers.is_bootstrapped')
+ @patch('helpers.is_cassandra_running')
+ @patch('helpers.is_decommissioned')
+ def test_needs_restart(self, is_decom, is_running, is_bootstrapped,
+ needs_remount, seed_ips, actual_seeds,
+ status_set, service_start):
+ is_decom.return_value = False
+ is_running.return_value = True
+ needs_remount.return_value = False
+ seed_ips.return_value = set(['1.2.3.4'])
+ actual_seeds.return_value = set(['1.2.3.4'])
+
+ config = hookenv.config()
+ config['configured_seeds'] = list(sorted(seed_ips()))
+ config.save()
+ config.load_previous() # Ensure everything flagged as unchanged.
+
+ self.assertFalse(actions.needs_restart())
+
+ # Decommissioned nodes are not restarted.
+ is_decom.return_value = True
+ self.assertFalse(actions.needs_restart())
+ is_decom.return_value = False
+ self.assertFalse(actions.needs_restart())
+
+ # Nodes not running need to be restarted.
+ is_running.return_value = False
+ self.assertTrue(actions.needs_restart())
+ is_running.return_value = True
+ self.assertFalse(actions.needs_restart())
+
+ # If we have a new mountpoint, we need to restart in order to
+ # migrate data.
+ needs_remount.return_value = True
+ self.assertTrue(actions.needs_restart())
+ needs_remount.return_value = False
+ self.assertFalse(actions.needs_restart())
+
+ # Certain changed config items trigger a restart.
+ config['max_heap_size'] = '512M'
+ self.assertTrue(actions.needs_restart())
+ config.save()
+ config.load_previous()
+ self.assertFalse(actions.needs_restart())
+
+ # A new IP address requires a restart.
+ config['unit_private_ip'] = 'new'
+ self.assertTrue(actions.needs_restart())
+ config.save()
+ config.load_previous()
+ self.assertFalse(actions.needs_restart())
+
+ # If the seeds have changed, we need to restart.
+ seed_ips.return_value = set(['9.8.7.6'])
+ actual_seeds.return_value = set(['9.8.7.6'])
+ self.assertTrue(actions.needs_restart())
+ is_running.side_effect = iter([False, True])
+ helpers.start_cassandra()
+ is_running.side_effect = None
+ is_running.return_value = True
+ self.assertFalse(actions.needs_restart())
+
+ @patch('charmhelpers.core.hookenv.is_leader')
+ @patch('helpers.is_bootstrapped')
+ @patch('helpers.ensure_database_directories')
+ @patch('helpers.remount_cassandra')
+ @patch('helpers.start_cassandra')
+ @patch('helpers.stop_cassandra')
+ @patch('helpers.status_set')
+ def test_maybe_restart(self, status_set, stop_cassandra, start_cassandra,
+ remount, ensure_directories, is_bootstrapped,
+ is_leader):
+ coordinator.grants = {}
+ coordinator.requests = {hookenv.local_unit(): {}}
+ coordinator.relid = 'cluster:1'
+ coordinator.grant('restart', hookenv.local_unit())
+ actions.maybe_restart('')
+ stop_cassandra.assert_called_once_with()
+ remount.assert_called_once_with()
+ ensure_directories.assert_called_once_with()
+ start_cassandra.assert_called_once_with()
+
+ @patch('helpers.stop_cassandra')
+ def test_stop_cassandra(self, helpers_stop_cassandra):
+ actions.stop_cassandra('ignored')
+ helpers_stop_cassandra.assert_called_once_with()
+
+ @patch('helpers.start_cassandra')
+ def test_start_cassandra(self, helpers_start_cassandra):
+ actions.start_cassandra('ignored')
+ helpers_start_cassandra.assert_called_once_with()
+
+ @patch('os.path.isdir')
+ @patch('helpers.get_all_database_directories')
+ @patch('helpers.set_io_scheduler')
+ def test_reset_all_io_schedulers(self, set_io_scheduler, dbdirs, isdir):
+ hookenv.config()['io_scheduler'] = sentinel.io_scheduler
+ dbdirs.return_value = dict(
+ data_file_directories=[sentinel.d1, sentinel.d2],
+ commitlog_directory=sentinel.cl,
+ saved_caches_directory=sentinel.sc)
+ isdir.return_value = True
+ actions.reset_all_io_schedulers('')
+ set_io_scheduler.assert_has_calls([
+ call(sentinel.io_scheduler, sentinel.d1),
+ call(sentinel.io_scheduler, sentinel.d2),
+ call(sentinel.io_scheduler, sentinel.cl),
+ call(sentinel.io_scheduler, sentinel.sc)],
+ any_order=True)
+
+ # If directories don't exist yet, nothing happens.
+ set_io_scheduler.reset_mock()
+ isdir.return_value = False
+ actions.reset_all_io_schedulers('')
+ self.assertFalse(set_io_scheduler.called)
+
+ def test_config_key_lists_complete(self):
+ # Ensure that we have listed all keys in either
+ # RESTART_REQUIRED_KEYS, RESTART_NOT_REQUIRED_KEYS or
+ # UNCHANGEABLE_KEYS. This is to ensure that RESTART_REQUIRED_KEYS
+ # is maintained as new config items are added over time.
+ config_path = os.path.join(os.path.dirname(__file__), os.pardir,
+ 'config.yaml')
+ with open(config_path, 'r') as f:
+ config = yaml.safe_load(f)
+
+ combined = actions.RESTART_REQUIRED_KEYS.union(
+ actions.RESTART_NOT_REQUIRED_KEYS).union(
+ actions.UNCHANGEABLE_KEYS)
+
+ for key in config['options']:
+ with self.subTest(key=key):
+ self.assertIn(key, combined)
+
+ @patch('actions._publish_database_relation')
+ def test_publish_database_relations(self, publish_db_rel):
+ actions.publish_database_relations('')
+ publish_db_rel.assert_called_once_with('database:1', superuser=False)
+
+ @patch('actions._publish_database_relation')
+ def test_publish_database_admin_relations(self, publish_db_rel):
+ actions.publish_database_admin_relations('')
+ publish_db_rel.assert_called_once_with('database-admin:1',
+ superuser=True)
+
+ @patch('helpers.leader_ping')
+ @patch('helpers.ensure_user')
+ @patch('helpers.connect')
+ @patch('helpers.get_service_name')
+ @patch('helpers.encrypt_password')
+ @patch('charmhelpers.core.host.pwgen')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ @patch('actions._client_credentials')
+ def test_publish_database_relation_leader(self, client_creds, is_leader,
+ pwgen, encrypt_password,
+ get_service_name,
+ connect, ensure_user,
+ leader_ping):
+ is_leader.return_value = True # We are the leader.
+ client_creds.return_value = (None, None) # No creds published yet.
+
+ get_service_name.return_value = 'cservice'
+ pwgen.side_effect = iter(['secret1', 'secret2'])
+ encrypt_password.side_effect = iter(['crypt1', 'crypt2'])
+ connect().__enter__.return_value = sentinel.session
+
+ config = hookenv.config()
+ config['native_transport_port'] = 666
+ config['rpc_port'] = 777
+ config['cluster_name'] = 'fred'
+ config['datacenter'] = 'mission_control'
+ config['rack'] = '01'
+
+ actions._publish_database_relation('database:1', superuser=False)
+
+ ensure_user.assert_called_once_with(sentinel.session,
+ 'juju_cservice', 'crypt1',
+ False)
+ leader_ping.assert_called_once_with() # Peers woken.
+
+ hookenv.relation_set.assert_has_calls([
+ call('database:1',
+ username='juju_cservice', password='secret1',
+ host='10.30.0.1', native_transport_port=666, rpc_port=777,
+ cluster_name='fred', datacenter='mission_control',
+ rack='01')])
+
+ @patch('helpers.leader_ping')
+ @patch('helpers.ensure_user')
+ @patch('helpers.connect')
+ @patch('helpers.get_service_name')
+ @patch('helpers.encrypt_password')
+ @patch('charmhelpers.core.host.pwgen')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ @patch('actions._client_credentials')
+ def test_publish_database_relation_super(self, client_creds, is_leader,
+ pwgen, encrypt_password,
+ get_service_name,
+ connect, ensure_user,
+ leader_ping):
+ is_leader.return_value = True # We are the leader.
+ client_creds.return_value = (None, None) # No creds published yet.
+
+ get_service_name.return_value = 'cservice'
+ pwgen.side_effect = iter(['secret1', 'secret2'])
+ encrypt_password.side_effect = iter(['crypt1', 'crypt2'])
+ connect().__enter__.return_value = sentinel.session
+
+ config = hookenv.config()
+ config['native_transport_port'] = 666
+ config['rpc_port'] = 777
+ config['cluster_name'] = 'fred'
+ config['datacenter'] = 'mission_control'
+ config['rack'] = '01'
+
+ actions._publish_database_relation('database:1', superuser=True)
+
+ ensure_user.assert_called_once_with(sentinel.session,
+ 'juju_cservice_admin', 'crypt1',
+ True)
+
+ @patch('charmhelpers.core.host.write_file')
+ def test_install_maintenance_crontab(self, write_file):
+ # First 7 units get distributed, one job per day.
+ hookenv.local_unit.return_value = 'foo/0'
+ actions.install_maintenance_crontab('')
+ write_file.assert_called_once_with('/etc/cron.d/cassandra-maintenance',
+ ANY)
+ contents = write_file.call_args[0][1]
+ # Not the complete command, but includes all the expanded
+ # variables.
+ expected = (b'\n0 0 * * 0 cassandra run-one-until-success '
+ b'nodetool repair -pr')
+ self.assertIn(expected, contents)
+
+ # Next 7 units distributed 12 hours out of sync with the first
+ # batch.
+ hookenv.local_unit.return_value = 'foo/8'
+ actions.install_maintenance_crontab('')
+ contents = write_file.call_args[0][1]
+ expected = (b'\n0 12 * * 1 cassandra run-one-until-success '
+ b'nodetool repair -pr')
+ self.assertIn(expected, contents)
+
+ # Later units per helpers.week_spread()
+ hookenv.local_unit.return_value = 'foo/411'
+ actions.install_maintenance_crontab('')
+ contents = write_file.call_args[0][1]
+ expected = (b'\n37 8 * * 5 cassandra run-one-until-success '
+ b'nodetool repair -pr')
+ self.assertIn(expected, contents)
+
+ @patch('helpers.emit_netstats')
+ @patch('helpers.emit_status')
+ @patch('helpers.emit_describe_cluster')
+ def test_emit_cluster_info(self, emit_desc, emit_status, emit_netstats):
+ actions.emit_cluster_info('')
+ emit_desc.assert_called_once_with()
+ emit_status.assert_called_once_with()
+ emit_netstats.assert_called_once_with()
+
+ @patch('charmhelpers.core.hookenv.relations_of_type')
+ @patch('actions.ufw')
+ def test_configure_firewall(self, ufw, rel_of_type):
+ rel_of_type.side_effect = iter([[{'private-address': '1.1.0.1'},
+ {'private-address': '1.1.0.2'}],
+ []])
+ actions.configure_firewall('')
+
+ # Confirm our mock provided the expected data.
+ rel_of_type.assert_has_calls([call('cluster'), call('database-admin')])
+
+ ufw.enable.assert_called_once_with(soft_fail=True) # Always enabled.
+
+ # SSH and the client protocol ports are always fully open.
+ ufw.service.assert_has_calls([call('ssh', 'open'),
+ call('nrpe', 'open'),
+ call('rsync', 'open'),
+ call(9042, 'open'),
+ call(9160, 'open')])
+
+ # This test is running for the first time, so there are no
+ # previously applied rules to remove. It opens necessary access
+ # to peers and other related units. The 1.1.* addresses are
+ # peers, and they get storage (7000), ssl_storage (7001),
+ # JMX (7199), Thrift (9160) and native (9042). The remaining
+ # addresses are clients, getting just Thrift and native.
+ ufw.grant_access.assert_has_calls([call('1.1.0.1', 'any', 7000),
+ call('1.1.0.1', 'any', 7001),
+
+ call('1.1.0.2', 'any', 7000),
+ call('1.1.0.2', 'any', 7001)],
+ any_order=True)
+
+ # If things change in a later hook, unwanted rules are removed
+ # and new ones added.
+ config = hookenv.config()
+ config.save()
+ config.load_previous()
+ config['native_transport_port'] = 7777 # 9042 -> 7777
+ config['storage_port'] = 7002 # 7000 -> 7002
+ config['open_client_ports'] = True
+ ufw.reset_mock()
+
+ rel_of_type.side_effect = iter([[],
+ [{'private-address': '1.1.0.1'},
+ {'private-address': '1.1.0.2'}]])
+ actions.configure_firewall('')
+
+ # Three ports now globally open. Yes, having the globally open
+ # native and Thrift ports does make the later more specific
+ # rules meaningless, but we add the specific rules anyway.
+ ufw.service.assert_has_calls([call('ssh', 'open'),
+ call('nrpe', 'open'),
+ call(9042, 'close'),
+ call(7777, 'open'),
+ call(9160, 'open')], any_order=True)
+ ufw.revoke_access.assert_has_calls([call('1.1.0.1', 'any', 7000),
+ call('1.1.0.2', 'any', 7000)],
+ any_order=True)
+ ufw.grant_access.assert_has_calls([call('1.1.0.1', 'any', 7001),
+ call('1.1.0.1', 'any', 7002),
+ call('1.1.0.2', 'any', 7001),
+ call('1.1.0.2', 'any', 7002)],
+ any_order=True)
+
+ @patch('helpers.mountpoint')
+ @patch('helpers.get_cassandra_version')
+ @patch('charmhelpers.core.host.write_file')
+ @patch('charmhelpers.contrib.charmsupport.nrpe.NRPE')
+ @patch('helpers.local_plugins_dir')
+ def test_nrpe_external_master_relation(self, local_plugins_dir, nrpe,
+ write_file, cassandra_version,
+ mountpoint):
+ mountpoint.side_effect = os.path.dirname
+ cassandra_version.return_value = '2.2'
+ # The fake charm_dir() needs populating.
+ plugin_src_dir = os.path.join(os.path.dirname(__file__),
+ os.pardir, 'files')
+ shutil.copytree(plugin_src_dir,
+ os.path.join(hookenv.charm_dir(), 'files'))
+
+ with tempfile.TemporaryDirectory() as d:
+ local_plugins_dir.return_value = d
+ actions.nrpe_external_master_relation('')
+
+ # The expected file was written to the expected filename
+ # with required perms.
+ with open(os.path.join(plugin_src_dir, 'check_cassandra_heap.sh'),
+ 'rb') as f:
+ write_file.assert_called_once_with(
+ os.path.join(d, 'check_cassandra_heap.sh'), f.read(),
+ perms=0o555)
+
+ nrpe().add_check.assert_has_calls([
+ call(shortname='cassandra_heap',
+ description='Check Cassandra Heap',
+ check_cmd='check_cassandra_heap.sh localhost 80 90'),
+ call(description=('Check Cassandra Disk '
+ '/var/lib/cassandra'),
+ shortname='cassandra_disk_var_lib_cassandra',
+ check_cmd=('check_disk -u GB -w 50% -c 25% -K 5% '
+ '-p /var/lib/cassandra'))],
+ any_order=True)
+
+ nrpe().write.assert_called_once_with()
+
+ @patch('helpers.get_cassandra_version')
+ @patch('charmhelpers.core.host.write_file')
+ @patch('os.path.exists')
+ @patch('charmhelpers.contrib.charmsupport.nrpe.NRPE')
+ def test_nrpe_external_master_relation_no_local(self, nrpe, exists,
+ write_file, ver):
+ ver.return_value = '2.2'
+ # If the local plugins directory doesn't exist, we don't attempt
+ # to write files to it. Wait until the subordinate has set it
+ # up.
+ exists.return_value = False
+ actions.nrpe_external_master_relation('')
+ self.assertFalse(write_file.called)
+
+ @patch('helpers.mountpoint')
+ @patch('helpers.get_cassandra_version')
+ @patch('os.path.exists')
+ @patch('charmhelpers.contrib.charmsupport.nrpe.NRPE')
+ def test_nrpe_external_master_relation_disable_heapchk(self, nrpe, exists,
+ ver, mountpoint):
+ ver.return_value = '2.2'
+ exists.return_value = False
+ mountpoint.side_effect = os.path.dirname
+
+ # Disable our checks
+ config = hookenv.config()
+ config['nagios_heapchk_warn_pct'] = 0 # Only one needs to be disabled.
+ config['nagios_heapchk_crit_pct'] = 90
+
+ actions.nrpe_external_master_relation('')
+ exists.assert_called_once_with(helpers.local_plugins_dir())
+
+ nrpe().add_check.assert_has_calls([
+ call(shortname='cassandra_disk_var_lib_cassandra',
+ description=ANY, check_cmd=ANY)], any_order=True)
+
+ @patch('helpers.get_cassandra_version')
+ @patch('os.path.exists')
+ @patch('charmhelpers.contrib.charmsupport.nrpe.NRPE')
+ def test_nrpe_external_master_relation_disable_diskchk(self, nrpe,
+ exists, ver):
+ ver.return_value = '2.2'
+ exists.return_value = False
+
+ # Disable our checks
+ config = hookenv.config()
+ config['nagios_disk_warn_pct'] = 0 # Only one needs to be disabled.
+ config['magios_disk_crit_pct'] = 50
+
+ actions.nrpe_external_master_relation('')
+ exists.assert_called_once_with(helpers.local_plugins_dir())
+
+ nrpe().add_check.assert_called_once_with(shortname='cassandra_heap',
+ description=ANY,
+ check_cmd=ANY)
+
+ @patch('helpers.get_bootstrapped_ips')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.hookenv.leader_set')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_maintain_seeds(self, is_leader, leader_set,
+ seed_ips, bootstrapped_ips):
+ is_leader.return_value = True
+
+ seed_ips.return_value = set(['1.2.3.4'])
+ bootstrapped_ips.return_value = set(['2.2.3.4', '3.2.3.4',
+ '4.2.3.4', '5.2.3.4'])
+
+ actions.maintain_seeds('')
+ leader_set.assert_called_once_with(seeds='2.2.3.4,3.2.3.4,4.2.3.4')
+
+ @patch('helpers.get_bootstrapped_ips')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.hookenv.leader_set')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_maintain_seeds_start(self, is_leader, leader_set,
+ seed_ips, bootstrapped_ips):
+ seed_ips.return_value = set()
+ bootstrapped_ips.return_value = set()
+ actions.maintain_seeds('')
+ # First seed is the first leader, which lets is get everything
+ # started.
+ leader_set.assert_called_once_with(seeds=hookenv.unit_private_ip())
+
+ @patch('charmhelpers.core.host.pwgen')
+ @patch('helpers.query')
+ @patch('helpers.set_unit_superusers')
+ @patch('helpers.ensure_user')
+ @patch('helpers.encrypt_password')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.connect')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ @patch('charmhelpers.core.hookenv.leader_set')
+ @patch('charmhelpers.core.hookenv.leader_get')
+ def test_reset_default_password(self, leader_get, leader_set, is_leader,
+ connect, sup_creds, encrypt_password,
+ ensure_user, set_sups, query, pwgen):
+ is_leader.return_value = True
+ leader_get.return_value = None
+ connect().__enter__.return_value = sentinel.session
+ connect().__exit__.return_value = False
+ connect.reset_mock()
+
+ sup_creds.return_value = (sentinel.username, sentinel.password)
+ encrypt_password.return_value = sentinel.pwhash
+ pwgen.return_value = sentinel.random_password
+
+ actions.reset_default_password('')
+
+ # First, a superuser account for the unit was created.
+ connect.assert_called_once_with('cassandra', 'cassandra',
+ timeout=120, auth_timeout=120)
+ encrypt_password.assert_called_once_with(sentinel.password)
+ ensure_user.assert_called_once_with(sentinel.session,
+ sentinel.username,
+ sentinel.pwhash,
+ superuser=True)
+ set_sups.assert_called_once_with([hookenv.local_unit()])
+
+ # After that, the default password is reset.
+ query.assert_called_once_with(sentinel.session,
+ 'ALTER USER cassandra WITH PASSWORD %s',
+ cassandra.ConsistencyLevel.ALL,
+ (sentinel.random_password,))
+
+ # Flag stored to avoid attempting this again.
+ leader_set.assert_called_once_with(default_admin_password_changed=True)
+
+ @patch('helpers.connect')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ @patch('charmhelpers.core.hookenv.leader_get')
+ def test_reset_default_password_noop(self, leader_get, is_leader, connect):
+ leader_get.return_value = True
+ is_leader.return_value = True
+ actions.reset_default_password('') # noop
+ self.assertFalse(connect.called)
+
+ @patch('helpers.get_seed_ips')
+ @patch('helpers.status_set')
+ @patch('charmhelpers.core.hookenv.status_get')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_set_active(self, is_leader, status_get, status_set, seed_ips):
+ is_leader.return_value = False
+ status_get.return_value = ('waiting', '')
+ seed_ips.return_value = set()
+ actions.set_active('')
+ status_set.assert_called_once_with('active', 'Live node')
+
+ @patch('helpers.get_seed_ips')
+ @patch('helpers.status_set')
+ @patch('charmhelpers.core.hookenv.status_get')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_set_active_seed(self, is_leader,
+ status_get, status_set, seed_ips):
+ is_leader.return_value = False
+ status_get.return_value = ('waiting', '')
+ seed_ips.return_value = set([hookenv.unit_private_ip()])
+ actions.set_active('')
+ status_set.assert_called_once_with('active', 'Live seed')
+
+ @patch('helpers.num_nodes')
+ @patch('helpers.get_seed_ips')
+ @patch('helpers.service_status_set')
+ @patch('helpers.status_set')
+ @patch('charmhelpers.core.hookenv.status_get')
+ @patch('charmhelpers.core.hookenv.is_leader')
+ def test_set_active_service(self, is_leader,
+ status_get, status_set, service_status_set,
+ seed_ips, num_nodes):
+ status_get.return_value = ('waiting', '')
+ is_leader.return_value = True
+ seed_ips.return_value = set([hookenv.unit_private_ip()])
+ num_nodes.return_value = 1
+ actions.set_active('')
+ service_status_set.assert_called_once_with('active',
+ 'Single node cluster')
+
+ service_status_set.reset_mock()
+ num_nodes.return_value = 6
+ actions.set_active('')
+ service_status_set.assert_called_once_with('active',
+ '6 node cluster')
+
+ @patch('helpers.encrypt_password')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.peer_relid')
+ def test_request_unit_superuser(self, peer_relid, sup_creds, crypt):
+ peer_relid.return_value = sentinel.peer_relid
+ sup_creds.return_value = (sentinel.username, sentinel.password)
+ crypt.return_value = sentinel.pwhash
+ hookenv.relation_get.return_value = dict()
+ actions.request_unit_superuser('')
+ hookenv.relation_set.assert_called_once_with(
+ sentinel.peer_relid,
+ username=sentinel.username, pwhash=sentinel.pwhash)
+
+ @patch('helpers.update_hosts_file')
+ @patch('socket.gethostname')
+ def test_update_etc_hosts(self, gethostname, update_hosts_file):
+ gethostname.return_value = sentinel.hostname
+ actions.update_etc_hosts('')
+ update_hosts_file.assert_called_once_with(
+ '/etc/hosts', {'10.20.0.1': sentinel.hostname})
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/charms/trusty/cassandra/tests/test_definitions.py b/charms/trusty/cassandra/tests/test_definitions.py
new file mode 100755
index 0000000..98103c0
--- /dev/null
+++ b/charms/trusty/cassandra/tests/test_definitions.py
@@ -0,0 +1,104 @@
+#!.venv3/bin/python3
+
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from itertools import chain
+import functools
+import unittest
+from unittest.mock import patch
+
+from charmhelpers.core import hookenv
+from charmhelpers.core.services import ServiceManager
+
+from tests.base import TestCaseBase
+
+import definitions
+
+
+patch = functools.partial(patch, autospec=True)
+
+
+class TestDefinitions(TestCaseBase):
+ def test_get_service_definitions(self):
+ # We can't really test this in unit tests, but at least we can
+ # ensure the basic data structure is returned and accepted.
+ defs = definitions.get_service_definitions()
+ self.assertIsInstance(defs, list)
+ for d in defs:
+ with self.subTest(d=d):
+ self.assertIsInstance(d, dict)
+
+ def test_get_service_definitions_open_ports(self):
+ config = hookenv.config()
+ defs = definitions.get_service_definitions()
+ expected_ports = set([config['rpc_port'],
+ config['native_transport_port'],
+ config['storage_port'],
+ config['ssl_storage_port']])
+ opened_ports = set(chain(*(d.get('ports', []) for d in defs)))
+ self.assertSetEqual(opened_ports, expected_ports)
+
+ def test_get_service_manager(self):
+ self.assertIsInstance(definitions.get_service_manager(),
+ ServiceManager)
+
+ @patch('helpers.get_unit_superusers')
+ @patch('helpers.is_decommissioned')
+ @patch('helpers.is_cassandra_running')
+ def test_requires_live_node(self, is_running, is_decommissioned, get_sup):
+ is_decommissioned.return_value = False # Is not decommissioned.
+ is_running.return_value = True # Is running.
+ get_sup.return_value = set([hookenv.local_unit()]) # Creds exist.
+
+ self.assertTrue(bool(definitions.RequiresLiveNode()))
+
+ @patch('helpers.get_unit_superusers')
+ @patch('helpers.is_decommissioned')
+ @patch('helpers.is_cassandra_running')
+ def test_requires_live_node_decommissioned(self, is_running,
+ is_decommissioned, get_sup):
+ is_decommissioned.return_value = True # Is decommissioned.
+ is_running.return_value = True # Is running.
+ get_sup.return_value = set([hookenv.local_unit()]) # Creds exist.
+
+ self.assertFalse(bool(definitions.RequiresLiveNode()))
+
+ @patch('helpers.get_unit_superusers')
+ @patch('helpers.is_decommissioned')
+ @patch('helpers.is_cassandra_running')
+ def test_requires_live_node_down(self, is_running,
+ is_decommissioned, get_sup):
+ is_decommissioned.return_value = False # Is not decommissioned.
+ is_running.return_value = False # Is not running.
+ get_sup.return_value = set([hookenv.local_unit()]) # Creds exist.
+
+ self.assertFalse(bool(definitions.RequiresLiveNode()))
+
+ @patch('helpers.get_unit_superusers')
+ @patch('helpers.is_decommissioned')
+ @patch('helpers.is_cassandra_running')
+ def test_requires_live_node_creds(self, is_running,
+ is_decommissioned, get_sup):
+ is_decommissioned.return_value = False # Is not decommissioned.
+ is_running.return_value = True # Is running.
+ get_sup.return_value = set() # Creds do not exist.
+
+ self.assertFalse(bool(definitions.RequiresLiveNode()))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/charms/trusty/cassandra/tests/test_helpers.py b/charms/trusty/cassandra/tests/test_helpers.py
new file mode 100755
index 0000000..92fa1e8
--- /dev/null
+++ b/charms/trusty/cassandra/tests/test_helpers.py
@@ -0,0 +1,1466 @@
+#!.venv3/bin/python3
+
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from collections import namedtuple
+import errno
+import functools
+from itertools import repeat
+import os.path
+import subprocess
+import tempfile
+from textwrap import dedent
+import unittest
+from unittest.mock import ANY, call, MagicMock, patch, sentinel
+
+from cassandra import AuthenticationFailed, ConsistencyLevel
+from cassandra.cluster import NoHostAvailable
+import yaml
+
+from charmhelpers import fetch
+from charmhelpers.core import hookenv, host
+
+from tests.base import TestCaseBase
+import helpers
+
+
+patch = functools.partial(patch, autospec=True)
+
+
+class TestHelpers(TestCaseBase):
+ @patch('time.sleep')
+ def test_backoff(self, sleep):
+ i = 0
+ for _ in helpers.backoff('foo to bar'):
+ i += 1
+ if i == 10:
+ break
+ sleep.assert_has_calls([
+ call(2), call(4), call(8), call(16), call(32),
+ call(60), call(60), call(60), call(60)])
+
+ i = 0
+ for _ in helpers.backoff('foo to bar', max_pause=10):
+ i += 1
+ if i == 10:
+ break
+ sleep.assert_has_calls([
+ call(2), call(4), call(8), call(10), call(10),
+ call(10), call(10), call(10), call(10)])
+
+ def test_autostart_disabled(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+
+ prc = os.path.join(tmpdir, 'policy-rc.d')
+ prc_backup = prc + '-orig'
+
+ with helpers.autostart_disabled(_policy_rc=prc):
+ # No existing policy-rc.d, so no backup made.
+ self.assertFalse(os.path.exists(prc_backup))
+
+ # A policy-rc.d file has been created that will disable
+ # package autostart per spec (ie. returns a 101 exit code).
+ self.assertTrue(os.path.exists(prc))
+ self.assertEqual(subprocess.call([prc]), 101)
+
+ with helpers.autostart_disabled(_policy_rc=prc):
+ # A second time, we have a backup made.
+ # policy-rc.d still works
+ self.assertTrue(os.path.exists(prc_backup))
+ self.assertEqual(subprocess.call([prc]), 101)
+
+ # Backup removed, and policy-rc.d still works.
+ self.assertFalse(os.path.exists(prc_backup))
+ self.assertEqual(subprocess.call([prc]), 101)
+
+ # Neither backup nor policy-rc.d exist now we are out of the
+ # context manager.
+ self.assertFalse(os.path.exists(prc_backup))
+ self.assertFalse(os.path.exists(prc))
+
+ def test_autostart_disabled_partial(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+
+ prc = os.path.join(tmpdir, 'policy-rc.d')
+ prc_backup = prc + '-orig'
+
+ with helpers.autostart_disabled(['foo', 'bar'], _policy_rc=prc):
+ # No existing policy-rc.d, so no backup made.
+ self.assertFalse(os.path.exists(prc_backup))
+
+ # A policy-rc.d file has been created that will disable
+ # package autostart per spec (ie. returns a 101 exit code).
+ self.assertTrue(os.path.exists(prc))
+ self.assertEqual(subprocess.call([prc, 'foo']), 101)
+ self.assertEqual(subprocess.call([prc, 'bar']), 101)
+ self.assertEqual(subprocess.call([prc, 'baz']), 0)
+
+ # Neither backup nor policy-rc.d exist now we are out of the
+ # context manager.
+ self.assertFalse(os.path.exists(prc_backup))
+ self.assertFalse(os.path.exists(prc))
+
+ @patch('helpers.autostart_disabled')
+ @patch('charmhelpers.fetch.apt_install')
+ def test_install_packages(self, apt_install, autostart_disabled):
+ packages = ['a_pack', 'b_pack']
+ helpers.install_packages(packages)
+
+ # All packages got installed, and hook aborted if package
+ # installation failed.
+ apt_install.assert_called_once_with(['a_pack', 'b_pack'], fatal=True)
+
+ # The autostart_disabled context manager was used to stop
+ # package installation starting services.
+ autostart_disabled().__enter__.assert_called_once_with()
+ autostart_disabled().__exit__.assert_called_once_with(None, None, None)
+
+ @patch('helpers.autostart_disabled')
+ @patch('charmhelpers.fetch.apt_install')
+ def test_install_packages_extras(self, apt_install, autostart_disabled):
+ packages = ['a_pack', 'b_pack']
+ hookenv.config()['extra_packages'] = 'c_pack d_pack'
+ helpers.install_packages(packages)
+
+ # All packages got installed, and hook aborted if package
+ # installation failed.
+ apt_install.assert_called_once_with(['a_pack', 'b_pack',
+ 'c_pack', 'd_pack'], fatal=True)
+
+ # The autostart_disabled context manager was used to stop
+ # package installation starting services.
+ autostart_disabled().__enter__.assert_called_once_with()
+ autostart_disabled().__exit__.assert_called_once_with(None, None, None)
+
+ @patch('helpers.autostart_disabled')
+ @patch('charmhelpers.fetch.apt_install')
+ def test_install_packages_noop(self, apt_install, autostart_disabled):
+ # Everything is already installed. Nothing to do.
+ fetch.filter_installed_packages.side_effect = lambda pkgs: []
+
+ packages = ['a_pack', 'b_pack']
+ hookenv.config()['extra_packages'] = 'c_pack d_pack'
+ helpers.install_packages(packages)
+
+ # All packages got installed, and hook aborted if package
+ # installation failed.
+ self.assertFalse(apt_install.called)
+
+ # Autostart wasn't messed with.
+ self.assertFalse(autostart_disabled.called)
+
+ @patch('subprocess.Popen')
+ def test_ensure_package_status(self, popen):
+ for status in ['install', 'hold']:
+ with self.subTest(status=status):
+ popen.reset_mock()
+ hookenv.config()['package_status'] = status
+ helpers.ensure_package_status(['a_pack', 'b_pack'])
+
+ selections = 'a_pack {}\nb_pack {}\n'.format(
+ status, status).encode('US-ASCII')
+
+ self.assertEqual(
+ [call(['dpkg', '--set-selections'], stdin=subprocess.PIPE),
+ call().communicate(input=selections)], popen.mock_calls)
+
+ popen.reset_mock()
+ hookenv.config()['package_status'] = 'invalid'
+ self.assertRaises(RuntimeError,
+ helpers.ensure_package_status, ['a_pack', 'b_back'])
+ self.assertFalse(popen.called)
+
+ @patch('charmhelpers.core.hookenv.leader_get')
+ def test_get_seed_ips(self, leader_get):
+ leader_get.return_value = '1.2.3.4,5.6.7.8'
+ self.assertSetEqual(helpers.get_seed_ips(), set(['1.2.3.4',
+ '5.6.7.8']))
+
+ @patch('helpers.read_cassandra_yaml')
+ def test_actual_seed_ips(self, read_yaml):
+ read_yaml.return_value = yaml.load(dedent('''\
+ seed_provider:
+ - class_name: blah
+ parameters:
+ - seeds: a,b,c
+ '''))
+ self.assertSetEqual(helpers.actual_seed_ips(),
+ set(['a', 'b', 'c']))
+
+ @patch('relations.StorageRelation')
+ def test_get_database_directory(self, storage_relation):
+ storage_relation().mountpoint = None
+
+ # Relative paths are relative to /var/lib/cassandra
+ self.assertEqual(helpers.get_database_directory('bar'),
+ '/var/lib/cassandra/bar')
+
+ # If there is an external mount, relative paths are relative to
+ # it. Note the extra 'cassandra' directory - life is easier
+ # if we store all our data in a subdirectory on the external
+ # mount rather than in its root.
+ storage_relation().mountpoint = '/srv/foo'
+ self.assertEqual(helpers.get_database_directory('bar'),
+ '/srv/foo/cassandra/bar')
+
+ # Absolute paths are absolute and passed through unmolested.
+ self.assertEqual(helpers.get_database_directory('/bar'), '/bar')
+
+ @patch('helpers.get_cassandra_version')
+ @patch('relations.StorageRelation')
+ def test_get_all_database_directories(self, storage_relation, ver):
+ ver.return_value = '2.2'
+ storage_relation().mountpoint = '/s'
+ self.assertDictEqual(
+ helpers.get_all_database_directories(),
+ dict(data_file_directories=['/s/cassandra/data'],
+ commitlog_directory='/s/cassandra/commitlog',
+ saved_caches_directory='/s/cassandra/saved_caches'))
+
+ @patch('helpers.get_cassandra_version')
+ @patch('relations.StorageRelation')
+ def test_get_all_database_directories_30(self, storage_relation, ver):
+ ver.return_value = '3.0'
+ storage_relation().mountpoint = '/s'
+ self.assertDictEqual(
+ helpers.get_all_database_directories(),
+ dict(data_file_directories=['/s/cassandra/data'],
+ commitlog_directory='/s/cassandra/commitlog',
+ saved_caches_directory='/s/cassandra/saved_caches',
+ hints_directory='/s/cassandra/hints'))
+
+ @patch('helpers.recursive_chown')
+ @patch('charmhelpers.core.host.mkdir')
+ @patch('helpers.get_database_directory')
+ @patch('helpers.is_cassandra_running')
+ def test_ensure_database_directory(self, is_running, get_db_dir, mkdir,
+ recursive_chown):
+ absdir = '/an/absolute/dir'
+ is_running.return_value = False
+ get_db_dir.return_value = absdir
+
+ # ensure_database_directory() returns the absolute path.
+ self.assertEqual(helpers.ensure_database_directory(absdir), absdir)
+
+ # The directory will have been made.
+ mkdir.assert_has_calls([
+ call('/an'),
+ call('/an/absolute'),
+ call('/an/absolute/dir',
+ owner='cassandra', group='cassandra', perms=0o750)])
+
+ # The ownership of the contents has not been reset. Rather than
+ # attempting to remount an existing database, which requires
+ # resetting permissions, it is better to use sstableloader to
+ # import the data into the cluster.
+ self.assertFalse(recursive_chown.called)
+
+ @patch('charmhelpers.core.host.write_file')
+ @patch('os.path.isdir')
+ @patch('subprocess.check_output')
+ def test_set_io_scheduler(self, check_output, isdir, write_file):
+ # Normal operation, the device is detected and the magic
+ # file written.
+ check_output.return_value = 'foo\n/dev/sdq 1 2 3 1% /foo\n'
+ isdir.return_value = True
+
+ helpers.set_io_scheduler('fnord', '/foo')
+
+ write_file.assert_called_once_with('/sys/block/sdq/queue/scheduler',
+ b'fnord', perms=0o644)
+
+ # Some OSErrors we log warnings for, and continue.
+ for e in (errno.EACCES, errno.ENOENT):
+ with self.subTest(errno=e):
+ write_file.side_effect = repeat(OSError(e, 'Whoops'))
+ hookenv.log.reset_mock()
+ helpers.set_io_scheduler('fnord', '/foo')
+ hookenv.log.assert_has_calls([call(ANY),
+ call(ANY, hookenv.WARNING)])
+
+ # Other OSErrors just fail hard.
+ write_file.side_effect = iter([OSError(errno.EFAULT, 'Whoops')])
+ self.assertRaises(OSError, helpers.set_io_scheduler, 'fnord', '/foo')
+
+ # If we are not under lxc, nothing happens at all except a log
+ # message.
+ helpers.is_lxc.return_value = True
+ hookenv.log.reset_mock()
+ write_file.reset_mock()
+ helpers.set_io_scheduler('fnord', '/foo')
+ self.assertFalse(write_file.called)
+ hookenv.log.assert_called_once_with(ANY) # A single INFO message.
+
+ @patch('shutil.chown')
+ def test_recursive_chown(self, chown):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ os.makedirs(os.path.join(tmpdir, 'a', 'bb', 'ccc'))
+ with open(os.path.join(tmpdir, 'top file'), 'w') as f:
+ f.write('top file')
+ with open(os.path.join(tmpdir, 'a', 'bb', 'midfile'), 'w') as f:
+ f.write('midfile')
+ helpers.recursive_chown(tmpdir, 'un', 'gn')
+ chown.assert_has_calls(
+ [call(os.path.join(tmpdir, 'a'), 'un', 'gn'),
+ call(os.path.join(tmpdir, 'a', 'bb'), 'un', 'gn'),
+ call(os.path.join(tmpdir, 'a', 'bb', 'ccc'), 'un', 'gn'),
+ call(os.path.join(tmpdir, 'top file'), 'un', 'gn'),
+ call(os.path.join(tmpdir, 'a', 'bb', 'midfile'), 'un', 'gn')],
+ any_order=True)
+
+ def test_maybe_backup(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Our file is backed up to a .orig
+ path = os.path.join(tmpdir, 'foo.conf')
+ host.write_file(path, b'hello', perms=0o644)
+ helpers.maybe_backup(path)
+ path_orig = path + '.orig'
+ self.assertTrue(os.path.exists(path_orig))
+ with open(path_orig, 'rb') as f:
+ self.assertEqual(f.read(), b'hello')
+ # Safe permissions
+ self.assertEqual(os.lstat(path_orig).st_mode & 0o777, 0o600)
+
+ # A second call, nothing happens as the .orig is already
+ # there.
+ host.write_file(path, b'second')
+ helpers.maybe_backup(path)
+ with open(path_orig, 'rb') as f:
+ self.assertEqual(f.read(), b'hello')
+
+ @patch('charmhelpers.fetch.apt_cache')
+ def test_get_package_version(self, apt_cache):
+ version = namedtuple('Version', 'ver_str')('1.0-foo')
+ package = namedtuple('Package', 'current_ver')(version)
+ apt_cache.return_value = dict(package=package)
+ ver = helpers.get_package_version('package')
+ self.assertEqual(ver, '1.0-foo')
+
+ @patch('charmhelpers.fetch.apt_cache')
+ def test_get_package_version_not_found(self, apt_cache):
+ version = namedtuple('Version', 'ver_str')('1.0-foo')
+ package = namedtuple('Package', 'current_ver')(version)
+ apt_cache.return_value = dict(package=package)
+ self.assertIsNone(helpers.get_package_version('notfound'))
+
+ @patch('charmhelpers.fetch.apt_cache')
+ def test_get_package_version_not_installed(self, apt_cache):
+ package = namedtuple('Package', 'current_ver')(None)
+ apt_cache.return_value = dict(package=package)
+ self.assertIsNone(helpers.get_package_version('package'))
+
+ def test_get_jre(self):
+ hookenv.config()['jre'] = 'opEnjdk' # Case insensitive
+ self.assertEqual(helpers.get_jre(), 'openjdk')
+
+ hookenv.config()['jre'] = 'oRacle' # Case insensitive
+ self.assertEqual(helpers.get_jre(), 'oracle')
+
+ def test_get_jre_unknown(self):
+ hookenv.config()['jre'] = 'OopsJDK'
+ self.assertEqual(helpers.get_jre(), 'openjdk')
+ # An error was logged.
+ hookenv.log.assert_called_once_with(ANY, hookenv.ERROR)
+
+ def test_get_jre_dse_override(self):
+ hookenv.config()['edition'] = 'dse'
+ self.assertEqual(helpers.get_jre(), 'oracle')
+
+ def test_get_cassandra_edition(self):
+ hookenv.config()['edition'] = 'community'
+ self.assertEqual(helpers.get_cassandra_edition(), 'community')
+
+ hookenv.config()['edition'] = 'DSE' # Case insensitive
+ self.assertEqual(helpers.get_cassandra_edition(), 'dse')
+
+ self.assertFalse(hookenv.log.called)
+
+ hookenv.config()['edition'] = 'typo' # Default to community
+ self.assertEqual(helpers.get_cassandra_edition(), 'community')
+ hookenv.log.assert_any_call(ANY, hookenv.ERROR) # Logs an error.
+
+ @patch('helpers.get_cassandra_edition')
+ def test_get_cassandra_service(self, get_edition):
+ get_edition.return_value = 'whatever'
+ self.assertEqual(helpers.get_cassandra_service(), 'cassandra')
+ get_edition.return_value = 'dse'
+ self.assertEqual(helpers.get_cassandra_service(), 'dse')
+
+ def test_get_cassandra_service_dse_override(self):
+ hookenv.config()['edition'] = 'dse'
+ self.assertEqual(helpers.get_cassandra_service(), 'dse')
+
+ @patch('helpers.get_package_version')
+ def test_get_cassandra_version(self, get_package_version):
+ # Return cassandra package version if it is installed.
+ get_package_version.return_value = '1.2.3-2~64'
+ self.assertEqual(helpers.get_cassandra_version(), '1.2.3-2~64')
+ get_package_version.assert_called_with('cassandra')
+
+ @patch('helpers.get_package_version')
+ def test_get_cassandra_version_uninstalled(self, get_package_version):
+ # Return none if the main cassandra package is not installed
+ get_package_version.return_value = None
+ self.assertEqual(helpers.get_cassandra_version(), None)
+ get_package_version.assert_called_with('cassandra')
+
+ @patch('helpers.get_package_version')
+ def test_get_cassandra_version_dse(self, get_package_version):
+ # Return the cassandra version equivalent if using dse.
+ hookenv.config()['edition'] = 'dse'
+ get_package_version.return_value = '4.7-beta2~88'
+ self.assertEqual(helpers.get_cassandra_version(), '2.1')
+ get_package_version.assert_called_with('dse-full')
+
+ @patch('helpers.get_package_version')
+ def test_get_cassandra_version_dse_uninstalled(self, get_package_version):
+ # Return the cassandra version equivalent if using dse.
+ hookenv.config()['edition'] = 'dse'
+ get_package_version.return_value = None
+ self.assertEqual(helpers.get_cassandra_version(), None)
+ get_package_version.assert_called_with('dse-full')
+
+ def test_get_cassandra_config_dir(self):
+ self.assertEqual(helpers.get_cassandra_config_dir(),
+ '/etc/cassandra')
+ hookenv.config()['edition'] = 'dse'
+ self.assertEqual(helpers.get_cassandra_config_dir(),
+ '/etc/dse/cassandra')
+
+ @patch('helpers.get_cassandra_config_dir')
+ def test_get_cassandra_yaml_file(self, get_cassandra_config_dir):
+ get_cassandra_config_dir.return_value = '/foo'
+ self.assertEqual(helpers.get_cassandra_yaml_file(),
+ '/foo/cassandra.yaml')
+
+ @patch('helpers.get_cassandra_config_dir')
+ def test_get_cassandra_env_file(self, get_cassandra_config_dir):
+ get_cassandra_config_dir.return_value = '/foo'
+ self.assertEqual(helpers.get_cassandra_env_file(),
+ '/foo/cassandra-env.sh')
+
+ @patch('helpers.get_cassandra_config_dir')
+ def test_get_cassandra_rackdc_file(self, get_cassandra_config_dir):
+ get_cassandra_config_dir.return_value = '/foo'
+ self.assertEqual(helpers.get_cassandra_rackdc_file(),
+ '/foo/cassandra-rackdc.properties')
+
+ @patch('helpers.get_cassandra_edition')
+ def test_get_cassandra_pid_file(self, get_edition):
+ get_edition.return_value = 'whatever'
+ self.assertEqual(helpers.get_cassandra_pid_file(),
+ '/var/run/cassandra/cassandra.pid')
+ get_edition.return_value = 'dse'
+ self.assertEqual(helpers.get_cassandra_pid_file(),
+ '/var/run/dse/dse.pid')
+
+ def test_get_cassandra_packages(self):
+ # Default
+ self.assertSetEqual(helpers.get_cassandra_packages(),
+ set(['cassandra', 'ntp', 'run-one',
+ 'netcat', 'openjdk-8-jre-headless']))
+
+ def test_get_cassandra_packages_oracle_jre(self):
+ # Oracle JRE
+ hookenv.config()['jre'] = 'oracle'
+ self.assertSetEqual(helpers.get_cassandra_packages(),
+ set(['cassandra', 'ntp', 'run-one', 'netcat']))
+
+ def test_get_cassandra_packages_dse(self):
+ # DataStax Enterprise, and implicit Oracle JRE.
+ hookenv.config()['edition'] = 'dsE' # Insensitive.
+ self.assertSetEqual(helpers.get_cassandra_packages(),
+ set(['dse-full', 'ntp', 'run-one', 'netcat']))
+
+ @patch('helpers.get_cassandra_service')
+ @patch('charmhelpers.core.host.service_stop')
+ @patch('helpers.is_cassandra_running')
+ def test_stop_cassandra(self, is_cassandra_running,
+ service_stop, get_service):
+ get_service.return_value = sentinel.service_name
+ is_cassandra_running.side_effect = iter([True, False])
+ helpers.stop_cassandra()
+ service_stop.assert_called_once_with(sentinel.service_name)
+
+ @patch('helpers.get_cassandra_service')
+ @patch('charmhelpers.core.host.service_stop')
+ @patch('helpers.is_cassandra_running')
+ def test_stop_cassandra_noop(self, is_cassandra_running,
+ service_stop, get_service):
+ get_service.return_value = sentinel.service_name
+ is_cassandra_running.return_value = False
+ helpers.stop_cassandra()
+ self.assertFalse(service_stop.called)
+
+ @patch('charmhelpers.core.hookenv.status_set')
+ @patch('helpers.get_cassandra_service')
+ @patch('charmhelpers.core.host.service_stop')
+ @patch('helpers.is_cassandra_running')
+ def test_stop_cassandra_failure(self, is_cassandra_running,
+ service_stop, get_service, status_set):
+ get_service.return_value = sentinel.service_name
+ is_cassandra_running.side_effect = iter([True, True])
+ self.assertRaises(SystemExit, helpers.stop_cassandra)
+ service_stop.assert_called_once_with(sentinel.service_name)
+ status_set.assert_called_once_with('blocked',
+ 'Cassandra failed to shut down')
+
+ @patch('helpers.actual_seed_ips')
+ @patch('time.sleep')
+ @patch('helpers.get_cassandra_service')
+ @patch('charmhelpers.core.host.service_start')
+ @patch('helpers.is_cassandra_running')
+ def test_start_cassandra(self, is_cassandra_running,
+ service_start, get_service, sleep, seed_ips):
+ get_service.return_value = sentinel.service_name
+ seed_ips.return_value = set(['1.2.3.4'])
+ is_cassandra_running.return_value = True
+ helpers.start_cassandra()
+ self.assertFalse(service_start.called)
+
+ is_cassandra_running.side_effect = iter([False, False, False, True])
+ helpers.start_cassandra()
+ service_start.assert_called_once_with(sentinel.service_name)
+
+ # A side effect of starting cassandra is storing the current live
+ # seed list, so we can tell when it has changed.
+ self.assertEqual(hookenv.config()['configured_seeds'], ['1.2.3.4'])
+
+ @patch('os.chmod')
+ @patch('helpers.is_cassandra_running')
+ @patch('relations.StorageRelation')
+ def test_remount_cassandra(self, storage, is_running, chmod):
+ config = hookenv.config()
+ storage().needs_remount.return_value = True
+ storage().mountpoint = '/srv/foo'
+ is_running.return_value = False
+ config['data_file_directories'] = '/srv/ext/data1 data2'
+ config['bootstrapped_into_cluster'] = True
+
+ helpers.remount_cassandra()
+ storage().migrate.assert_called_once_with('/var/lib/cassandra',
+ 'cassandra')
+ chmod.assert_called_once_with('/srv/foo/cassandra', 0o750)
+ self.assertEqual(config['bootstrapped_into_cluster'], False)
+
+ @patch('os.chmod')
+ @patch('helpers.is_cassandra_running')
+ @patch('relations.StorageRelation')
+ def test_remount_cassandra_noop(self, storage, is_running, chmod):
+ storage().needs_remount.return_value = False
+ storage().mountpoint = None
+ is_running.return_value = False
+
+ helpers.remount_cassandra()
+ self.assertFalse(storage().migrate.called)
+ self.assertFalse(chmod.called)
+
+ @patch('helpers.is_cassandra_running')
+ @patch('relations.StorageRelation')
+ def test_remount_cassandra_unmount(self, storage, is_running):
+ storage().needs_remount.return_value = True
+ storage().mountpoint = None # Reverting to local disk.
+ is_running.return_value = False
+ hookenv.config()['data_file_directories'] = '/srv/ext/data1 data2'
+
+ helpers.remount_cassandra()
+
+ # We cannot migrate data back to local disk, as by the time our
+ # hooks are called the data is gone.
+ self.assertFalse(storage().migrate.called)
+
+ # We warn in this case, as reverting to local disk may resurrect
+ # old data (if the cluster was ever time while using local
+ # disk).
+ hookenv.log.assert_any_call(ANY, hookenv.WARNING)
+
+ @patch('helpers.ensure_database_directory')
+ @patch('helpers.get_all_database_directories')
+ def test_ensure_database_directories(self, get_all_dirs, ensure_dir):
+ get_all_dirs.return_value = dict(
+ data_file_directories=[sentinel.data_file_dir_1,
+ sentinel.data_file_dir_2],
+ commitlog_directory=sentinel.commitlog_dir,
+ saved_caches_directory=sentinel.saved_caches_dir)
+ helpers.ensure_database_directories()
+ ensure_dir.assert_has_calls([
+ call(sentinel.data_file_dir_1),
+ call(sentinel.data_file_dir_2),
+ call(sentinel.commitlog_dir),
+ call(sentinel.saved_caches_dir)], any_order=True)
+
+ @patch('cassandra.cluster.Cluster')
+ @patch('cassandra.auth.PlainTextAuthProvider')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.read_cassandra_yaml')
+ def test_connect(self, yaml, creds, auth_provider, cluster):
+ # host and port are pulled from the current active
+ # cassandra.yaml file, rather than configuration, as
+ # configuration may not match reality (if for no other reason
+ # that we are running this code in order to make reality match
+ # the desired configuration).
+ yaml.return_value = dict(rpc_address='1.2.3.4',
+ native_transport_port=666)
+
+ creds.return_value = ('un', 'pw')
+ auth_provider.return_value = sentinel.ap
+
+ cluster().connect.return_value = sentinel.session
+ cluster.reset_mock()
+
+ with helpers.connect() as session:
+ auth_provider.assert_called_once_with(username='un',
+ password='pw')
+ cluster.assert_called_once_with(['1.2.3.4'], port=666,
+ auth_provider=sentinel.ap)
+ self.assertIs(session, sentinel.session)
+ self.assertFalse(cluster().shutdown.called)
+
+ cluster().shutdown.assert_called_once_with()
+
+ @patch('cassandra.cluster.Cluster')
+ @patch('cassandra.auth.PlainTextAuthProvider')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.read_cassandra_yaml')
+ def test_connect_with_creds(self, yaml, creds, auth_provider, cluster):
+ # host and port are pulled from the current active
+ # cassandra.yaml file, rather than configuration, as
+ # configuration may not match reality (if for no other reason
+ # that we are running this code in order to make reality match
+ # the desired configuration).
+ yaml.return_value = dict(rpc_address='1.2.3.4',
+ native_transport_port=666)
+
+ auth_provider.return_value = sentinel.ap
+
+ with helpers.connect(username='explicit', password='boo'):
+ auth_provider.assert_called_once_with(username='explicit',
+ password='boo')
+
+ @patch('time.sleep')
+ @patch('time.time')
+ @patch('cassandra.cluster.Cluster')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.read_cassandra_yaml')
+ def test_connect_badauth(self, yaml, creds, cluster, time, sleep):
+ # host and port are pulled from the current active
+ # cassandra.yaml file, rather than configuration, as
+ # configuration may not match reality (if for no other reason
+ # that we are running this code in order to make reality match
+ # the desired configuration).
+ yaml.return_value = dict(rpc_address='1.2.3.4',
+ native_transport_port=666)
+ time.side_effect = [0, 7, 99999]
+
+ creds.return_value = ('un', 'pw')
+
+ x = NoHostAvailable('whoops', {'1.2.3.4': AuthenticationFailed()})
+ cluster().connect.side_effect = x
+
+ self.assertRaises(AuthenticationFailed, helpers.connect().__enter__)
+
+ # Authentication failures are retried, but for a shorter time
+ # than other connection errors which are retried for a few
+ # minutes.
+ self.assertEqual(cluster().connect.call_count, 2)
+ self.assertEqual(cluster().shutdown.call_count, 2)
+
+ @patch('time.sleep')
+ @patch('time.time')
+ @patch('cassandra.cluster.Cluster')
+ @patch('helpers.superuser_credentials')
+ @patch('helpers.read_cassandra_yaml')
+ def test_connect_timeout(self, yaml, creds, cluster, time, sleep):
+ yaml.return_value = dict(rpc_address='1.2.3.4',
+ native_transport_port=666)
+ time.side_effect = [0, 1, 2, 3, 10, 20, 30, 40, 99999]
+
+ creds.return_value = ('un', 'pw')
+
+ x = NoHostAvailable('whoops', {'1.2.3.4': sentinel.exception})
+ cluster().connect.side_effect = x
+
+ self.assertRaises(NoHostAvailable, helpers.connect().__enter__)
+
+ # Authentication failures fail immediately, unlike other
+ # connection errors which are retried.
+ self.assertEqual(cluster().connect.call_count, 5)
+ self.assertEqual(cluster().shutdown.call_count, 5)
+ self.assertEqual(sleep.call_count, 4)
+
+ @patch('cassandra.query.SimpleStatement')
+ def test_query(self, simple_statement):
+ simple_statement.return_value = sentinel.s_statement
+ session = MagicMock()
+ session.execute.return_value = sentinel.results
+ self.assertEqual(helpers.query(session, sentinel.statement,
+ sentinel.consistency, sentinel.args),
+ sentinel.results)
+ simple_statement.assert_called_once_with(
+ sentinel.statement, consistency_level=sentinel.consistency)
+ session.execute.assert_called_once_with(simple_statement(''),
+ sentinel.args)
+
+ @patch('cassandra.query.SimpleStatement')
+ @patch('helpers.backoff')
+ def test_query_retry(self, backoff, simple_statement):
+ backoff.return_value = repeat(True)
+ simple_statement.return_value = sentinel.s_statement
+ session = MagicMock()
+ session.execute.side_effect = iter([RuntimeError(), sentinel.results])
+ self.assertEqual(helpers.query(session, sentinel.statement,
+ sentinel.consistency, sentinel.args),
+ sentinel.results)
+ self.assertEqual(session.execute.call_count, 2)
+
+ @patch('time.time')
+ @patch('cassandra.query.SimpleStatement')
+ @patch('helpers.backoff')
+ def test_query_timeout(self, backoff, simple_statement, time):
+ backoff.return_value = repeat(True)
+ # Timeout is 600
+ time.side_effect = iter([0, 1, 2, 3, 500, 700, RuntimeError()])
+ simple_statement.return_value = sentinel.s_statement
+ session = MagicMock()
+
+ class Whoops(Exception):
+ pass
+
+ session.execute.side_effect = repeat(Whoops('Fail'))
+ self.assertRaises(Whoops, helpers.query, session, sentinel.statement,
+ sentinel.consistency, sentinel.args)
+ self.assertEqual(session.execute.call_count, 4)
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.query')
+ def test_ensure_user(self, query, ver):
+ ver.return_value = '2.1'
+ helpers.ensure_user(sentinel.session,
+ sentinel.username, sentinel.pwhash,
+ superuser=sentinel.supflag)
+ query.assert_has_calls([
+ call(sentinel.session,
+ 'INSERT INTO system_auth.users (name, super) VALUES (%s, %s)',
+ ConsistencyLevel.ALL, (sentinel.username, sentinel.supflag)),
+ call(sentinel.session,
+ 'INSERT INTO system_auth.credentials (username, salted_hash) '
+ 'VALUES (%s, %s)',
+ ConsistencyLevel.ALL,
+ (sentinel.username, sentinel.pwhash))])
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.query')
+ def test_ensure_user_22(self, query, ver):
+ ver.return_value = '2.2'
+ helpers.ensure_user(sentinel.session,
+ sentinel.username, sentinel.pwhash,
+ superuser=sentinel.supflag)
+ query.assert_called_once_with(sentinel.session,
+ 'INSERT INTO system_auth.roles (role, '
+ 'can_login, is_superuser, salted_hash) '
+ 'VALUES (%s, TRUE, %s, %s)',
+ ConsistencyLevel.ALL,
+ (sentinel.username, sentinel.supflag,
+ sentinel.pwhash))
+
+ @patch('helpers.ensure_user')
+ @patch('helpers.encrypt_password')
+ @patch('helpers.nodetool')
+ @patch('helpers.reconfigure_and_restart_cassandra')
+ @patch('helpers.connect')
+ @patch('helpers.superuser_credentials')
+ def test_create_unit_superuser_hard(self, creds, connect, restart,
+ nodetool, encrypt_password,
+ ensure_user):
+ creds.return_value = (sentinel.username, sentinel.password)
+ connect().__enter__.return_value = sentinel.session
+ connect().__exit__.return_value = False
+ connect.reset_mock()
+
+ encrypt_password.return_value = sentinel.pwhash
+
+ helpers.create_unit_superuser_hard()
+
+ # Cassandra was restarted twice, first with authentication
+ # disabled and again with the normal configuration.
+ restart.assert_has_calls([
+ call(dict(authenticator='AllowAllAuthenticator',
+ rpc_address='localhost')),
+ call()])
+
+ # A connection was made as the superuser, which words because
+ # authentication has been disabled on this node.
+ connect.assert_called_once_with()
+
+ # The user was created.
+ encrypt_password.assert_called_once_with(sentinel.password)
+ ensure_user.assert_called_once_with(sentinel.session,
+ sentinel.username,
+ sentinel.pwhash,
+ superuser=True)
+
+ # Local Cassandra was flushed. This is probably unnecessary.
+ nodetool.assert_called_once_with('flush')
+
+ def test_cqlshrc_path(self):
+ self.assertEqual(helpers.get_cqlshrc_path(),
+ '/root/.cassandra/cqlshrc')
+
+ def test_superuser_username(self):
+ self.assertEqual(hookenv.local_unit(), 'service/1')
+ self.assertEqual(helpers.superuser_username(), 'juju_service_1')
+
+ @patch('helpers.superuser_username')
+ @patch('helpers.get_cqlshrc_path')
+ @patch('helpers.get_cassandra_version')
+ @patch('charmhelpers.core.host.pwgen')
+ def test_superuser_credentials_20(self, pwgen, get_cassandra_version,
+ get_cqlshrc_path, get_username):
+ get_cassandra_version.return_value = '2.0'
+ with tempfile.TemporaryDirectory() as dotcassandra_dir:
+ cqlshrc_path = os.path.join(dotcassandra_dir, 'cqlshrc')
+ get_cqlshrc_path.return_value = cqlshrc_path
+ get_username.return_value = 'foo'
+ pwgen.return_value = 'secret'
+ hookenv.config()['rpc_port'] = 666
+ hookenv.config()['native_transport_port'] = 777
+
+ # First time generates username & password.
+ username, password = helpers.superuser_credentials()
+ self.assertEqual(username, 'foo')
+ self.assertEqual(password, 'secret')
+
+ # Credentials are stored in the cqlshrc file.
+ expected_cqlshrc = dedent('''\
+ [authentication]
+ username = foo
+ password = secret
+
+ [connection]
+ hostname = 10.30.0.1
+ port = 666
+ ''').strip()
+ with open(cqlshrc_path, 'r') as f:
+ self.assertEqual(f.read().strip(), expected_cqlshrc)
+
+ # If the credentials have been stored, they are not
+ # regenerated.
+ pwgen.return_value = 'secret2'
+ username, password = helpers.superuser_credentials()
+ self.assertEqual(username, 'foo')
+ self.assertEqual(password, 'secret')
+ with open(cqlshrc_path, 'r') as f:
+ self.assertEqual(f.read().strip(), expected_cqlshrc)
+
+ @patch('helpers.superuser_username')
+ @patch('helpers.get_cqlshrc_path')
+ @patch('helpers.get_cassandra_version')
+ @patch('charmhelpers.core.host.pwgen')
+ def test_superuser_credentials(self, pwgen, get_cassandra_version,
+ get_cqlshrc_path, get_username):
+ # Cassandra 2.1 or higher uses native protocol in its cqlshrc
+ get_cassandra_version.return_value = '2.1'
+ with tempfile.TemporaryDirectory() as dotcassandra_dir:
+ cqlshrc_path = os.path.join(dotcassandra_dir, 'cqlshrc')
+ get_cqlshrc_path.return_value = cqlshrc_path
+ get_username.return_value = 'foo'
+ pwgen.return_value = 'secret'
+ hookenv.config()['rpc_port'] = 666
+ hookenv.config()['native_transport_port'] = 777
+
+ # First time generates username & password.
+ username, password = helpers.superuser_credentials()
+ self.assertEqual(username, 'foo')
+ self.assertEqual(password, 'secret')
+
+ # Credentials are stored in the cqlshrc file.
+ expected_cqlshrc = dedent('''\
+ [authentication]
+ username = foo
+ password = secret
+
+ [connection]
+ hostname = 10.30.0.1
+ port = 777
+ ''').strip()
+ with open(cqlshrc_path, 'r') as f:
+ self.assertEqual(f.read().strip(), expected_cqlshrc)
+
+ @patch('subprocess.check_output')
+ def test_nodetool(self, check_output):
+ check_output.return_value = 'OK'
+ self.assertEqual(helpers.nodetool('status', 'system_auth'), 'OK')
+
+ # The expected command was run against the local node.
+ check_output.assert_called_once_with(
+ ['nodetool', 'status', 'system_auth'],
+ universal_newlines=True, stderr=subprocess.STDOUT, timeout=119)
+
+ # The output was emitted.
+ helpers.emit.assert_called_once_with('OK')
+
+ @patch('helpers.is_cassandra_running')
+ @patch('helpers.backoff')
+ @patch('subprocess.check_output')
+ def test_nodetool_CASSANDRA_8776(self, check_output, backoff, is_running):
+ is_running.return_value = True
+ backoff.return_value = repeat(True)
+ check_output.side_effect = iter(['ONE Error: stuff', 'TWO OK'])
+ self.assertEqual(helpers.nodetool('status'), 'TWO OK')
+
+ # The output was emitted.
+ helpers.emit.assert_called_once_with('TWO OK')
+
+ @patch('helpers.is_cassandra_running')
+ @patch('helpers.backoff')
+ @patch('subprocess.check_output')
+ def test_nodetool_retry(self, check_output, backoff, is_running):
+ backoff.return_value = repeat(True)
+ is_running.return_value = True
+ check_output.side_effect = iter([
+ subprocess.CalledProcessError([], 1, 'fail 1'),
+ subprocess.CalledProcessError([], 1, 'fail 2'),
+ subprocess.CalledProcessError([], 1, 'fail 3'),
+ subprocess.CalledProcessError([], 1, 'fail 4'),
+ subprocess.CalledProcessError([], 1, 'fail 5'),
+ 'OK'])
+ self.assertEqual(helpers.nodetool('status'), 'OK')
+
+ # Later fails and final output was emitted.
+ helpers.emit.assert_has_calls([call('fail 5'), call('OK')])
+
+ @patch('helpers.get_bootstrapped_ips')
+ def test_num_nodes(self, bootstrapped_ips):
+ bootstrapped_ips.return_value = ['10.0.0.1', '10.0.0.2']
+ self.assertEqual(helpers.num_nodes(), 2)
+
+ @patch('helpers.get_cassandra_yaml_file')
+ def test_read_cassandra_yaml(self, get_cassandra_yaml_file):
+ with tempfile.NamedTemporaryFile('w') as f:
+ f.write('a: one')
+ f.flush()
+ get_cassandra_yaml_file.return_value = f.name
+ self.assertDictEqual(helpers.read_cassandra_yaml(),
+ dict(a='one'))
+
+ @patch('helpers.get_cassandra_yaml_file')
+ def test_write_cassandra_yaml(self, get_cassandra_yaml_file):
+ with tempfile.NamedTemporaryFile() as f:
+ get_cassandra_yaml_file.return_value = f.name
+ helpers.write_cassandra_yaml([1, 2, 3])
+ with open(f.name, 'r') as f2:
+ self.assertEqual(f2.read(), '[1, 2, 3]\n')
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.get_cassandra_yaml_file')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.host.write_file')
+ def test_configure_cassandra_yaml_20(self, write_file, seed_ips, yaml_file,
+ get_cassandra_version):
+ get_cassandra_version.return_value = '2.0'
+ hookenv.config().update(dict(num_tokens=128,
+ cluster_name='test_cluster_name',
+ partitioner='test_partitioner'))
+
+ seed_ips.return_value = ['10.20.0.1', '10.20.0.2', '10.20.0.3']
+
+ existing_config = '''
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ - seeds: 127.0.0.1 # Comma separated list.
+ '''
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yaml_config = os.path.join(tmpdir, 'c.yaml')
+ yaml_file.return_value = yaml_config
+ with open(yaml_config, 'w', encoding='UTF-8') as f:
+ f.write(existing_config)
+
+ helpers.configure_cassandra_yaml()
+
+ self.assertEqual(write_file.call_count, 2)
+ new_config = write_file.call_args[0][1]
+
+ expected_config = dedent('''\
+ cluster_name: test_cluster_name
+ authenticator: PasswordAuthenticator
+ num_tokens: 128
+ partitioner: test_partitioner
+ listen_address: 10.20.0.1
+ rpc_address: 0.0.0.0
+ rpc_port: 9160
+ native_transport_port: 9042
+ storage_port: 7000
+ ssl_storage_port: 7001
+ authorizer: AllowAllAuthorizer
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ # No whitespace in seeds is important.
+ - seeds: '10.20.0.1,10.20.0.2,10.20.0.3'
+ endpoint_snitch: GossipingPropertyFileSnitch
+ data_file_directories:
+ - /var/lib/cassandra/data
+ commitlog_directory: /var/lib/cassandra/commitlog
+ saved_caches_directory: /var/lib/cassandra/saved_caches
+ compaction_throughput_mb_per_sec: 16
+ stream_throughput_outbound_megabits_per_sec: 200
+ tombstone_warn_threshold: 1000
+ tombstone_failure_threshold: 100000
+ start_rpc: true
+ ''')
+ self.maxDiff = None
+ self.assertEqual(yaml.safe_load(new_config),
+ yaml.safe_load(expected_config))
+
+ # Confirm we can use an explicit cluster_name too.
+ write_file.reset_mock()
+ hookenv.config()['cluster_name'] = 'fubar'
+ helpers.configure_cassandra_yaml()
+ new_config = write_file.call_args[0][1]
+ self.assertEqual(yaml.safe_load(new_config)['cluster_name'],
+ 'fubar')
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.get_cassandra_yaml_file')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.host.write_file')
+ def test_configure_cassandra_yaml_22(self, write_file, seed_ips, yaml_file,
+ get_cassandra_version):
+ get_cassandra_version.return_value = '2.0'
+ hookenv.config().update(dict(num_tokens=128,
+ cluster_name='test_cluster_name',
+ partitioner='test_partitioner'))
+
+ seed_ips.return_value = ['10.20.0.1', '10.20.0.2', '10.20.0.3']
+
+ existing_config = '''
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ - seeds: 127.0.0.1 # Comma separated list.
+ start_rpc: false # Defaults to False starting 2.2
+ '''
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yaml_config = os.path.join(tmpdir, 'c.yaml')
+ yaml_file.return_value = yaml_config
+ with open(yaml_config, 'w', encoding='UTF-8') as f:
+ f.write(existing_config)
+
+ helpers.configure_cassandra_yaml()
+
+ self.assertEqual(write_file.call_count, 2)
+ new_config = write_file.call_args[0][1]
+
+ expected_config = dedent('''\
+ start_rpc: true
+ cluster_name: test_cluster_name
+ authenticator: PasswordAuthenticator
+ num_tokens: 128
+ partitioner: test_partitioner
+ listen_address: 10.20.0.1
+ rpc_address: 0.0.0.0
+ rpc_port: 9160
+ native_transport_port: 9042
+ storage_port: 7000
+ ssl_storage_port: 7001
+ authorizer: AllowAllAuthorizer
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ # No whitespace in seeds is important.
+ - seeds: '10.20.0.1,10.20.0.2,10.20.0.3'
+ endpoint_snitch: GossipingPropertyFileSnitch
+ data_file_directories:
+ - /var/lib/cassandra/data
+ commitlog_directory: /var/lib/cassandra/commitlog
+ saved_caches_directory: /var/lib/cassandra/saved_caches
+ compaction_throughput_mb_per_sec: 16
+ stream_throughput_outbound_megabits_per_sec: 200
+ tombstone_warn_threshold: 1000
+ tombstone_failure_threshold: 100000
+ ''')
+ self.maxDiff = None
+ self.assertEqual(yaml.safe_load(new_config),
+ yaml.safe_load(expected_config))
+
+ # Confirm we can use an explicit cluster_name too.
+ write_file.reset_mock()
+ hookenv.config()['cluster_name'] = 'fubar'
+ helpers.configure_cassandra_yaml()
+ new_config = write_file.call_args[0][1]
+ self.assertEqual(yaml.safe_load(new_config)['cluster_name'],
+ 'fubar')
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.get_cassandra_yaml_file')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.host.write_file')
+ def test_configure_cassandra_yaml(self, write_file, seed_ips,
+ yaml_file, get_cassandra_version):
+ get_cassandra_version.return_value = '2.1'
+ hookenv.config().update(dict(num_tokens=128,
+ cluster_name='test_cluster_name',
+ partitioner='test_partitioner'))
+
+ seed_ips.return_value = ['10.20.0.1', '10.20.0.2', '10.20.0.3']
+
+ existing_config = '''
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ - seeds: 127.0.0.1 # Comma separated list.
+ '''
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yaml_config = os.path.join(tmpdir, 'c.yaml')
+ yaml_file.return_value = yaml_config
+ with open(yaml_config, 'w', encoding='UTF-8') as f:
+ f.write(existing_config)
+
+ helpers.configure_cassandra_yaml()
+
+ self.assertEqual(write_file.call_count, 2)
+ new_config = write_file.call_args[0][1]
+
+ expected_config = dedent('''\
+ cluster_name: test_cluster_name
+ authenticator: PasswordAuthenticator
+ num_tokens: 128
+ partitioner: test_partitioner
+ listen_address: 10.20.0.1
+ rpc_address: 0.0.0.0
+ broadcast_rpc_address: 10.30.0.1
+ start_rpc: true
+ rpc_port: 9160
+ native_transport_port: 9042
+ storage_port: 7000
+ ssl_storage_port: 7001
+ authorizer: AllowAllAuthorizer
+ seed_provider:
+ - class_name: blah.SimpleSeedProvider
+ parameters:
+ # No whitespace in seeds is important.
+ - seeds: '10.20.0.1,10.20.0.2,10.20.0.3'
+ endpoint_snitch: GossipingPropertyFileSnitch
+ data_file_directories:
+ - /var/lib/cassandra/data
+ commitlog_directory: /var/lib/cassandra/commitlog
+ saved_caches_directory: /var/lib/cassandra/saved_caches
+ compaction_throughput_mb_per_sec: 16
+ stream_throughput_outbound_megabits_per_sec: 200
+ tombstone_warn_threshold: 1000
+ tombstone_failure_threshold: 100000
+ ''')
+ self.maxDiff = None
+ self.assertEqual(yaml.safe_load(new_config),
+ yaml.safe_load(expected_config))
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.get_cassandra_yaml_file')
+ @patch('helpers.get_seed_ips')
+ @patch('charmhelpers.core.host.write_file')
+ def test_configure_cassandra_yaml_overrides(self, write_file, seed_ips,
+ yaml_file, version):
+ version.return_value = '2.1'
+ hookenv.config().update(dict(num_tokens=128,
+ cluster_name=None,
+ partitioner='my_partitioner'))
+
+ seed_ips.return_value = ['10.20.0.1', '10.20.0.2', '10.20.0.3']
+
+ existing_config = dedent('''\
+ seed_provider:
+ - class_name: blah.blah.SimpleSeedProvider
+ parameters:
+ - seeds: 127.0.0.1 # Comma separated list.
+ ''')
+ overrides = dict(partitioner='overridden_partitioner')
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yaml_config = os.path.join(tmpdir, 'c.yaml')
+ yaml_file.return_value = yaml_config
+ with open(yaml_config, 'w', encoding='UTF-8') as f:
+ f.write(existing_config)
+
+ helpers.configure_cassandra_yaml(overrides=overrides)
+
+ self.assertEqual(write_file.call_count, 2)
+ new_config = write_file.call_args[0][1]
+
+ self.assertEqual(yaml.safe_load(new_config)['partitioner'],
+ 'overridden_partitioner')
+
+ def test_get_pid_from_file(self):
+ with tempfile.NamedTemporaryFile('w') as pid_file:
+ pid_file.write(' 42\t')
+ pid_file.flush()
+ self.assertEqual(helpers.get_pid_from_file(pid_file.name), 42)
+ pid_file.write('\nSome Noise')
+ pid_file.flush()
+ self.assertEqual(helpers.get_pid_from_file(pid_file.name), 42)
+
+ for invalid_pid in ['-1', '0', 'fred']:
+ with self.subTest(invalid_pid=invalid_pid):
+ with tempfile.NamedTemporaryFile('w') as pid_file:
+ pid_file.write(invalid_pid)
+ pid_file.flush()
+ self.assertRaises(ValueError,
+ helpers.get_pid_from_file, pid_file.name)
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ self.assertRaises(OSError, helpers.get_pid_from_file,
+ os.path.join(tmpdir, 'invalid.pid'))
+
+ @patch('helpers.get_cassandra_pid_file')
+ def test_is_cassandra_running_not_running(self, get_pid_file):
+ # When Cassandra is not running, the pidfile does not exist.
+ get_pid_file.return_value = 'does not exist'
+ self.assertFalse(helpers.is_cassandra_running())
+
+ @patch('os.path.exists')
+ @patch('helpers.get_pid_from_file')
+ def test_is_cassandra_running_invalid_pid(self, get_pid_from_file, exists):
+ # get_pid_from_file raises a ValueError if the pid is illegal.
+ get_pid_from_file.side_effect = repeat(ValueError('Whoops'))
+ exists.return_value = True # The pid file is there, just insane.
+
+ # is_cassandra_running() fails hard in this case, since we
+ # cannot safely continue when the system is insane.
+ self.assertRaises(ValueError, helpers.is_cassandra_running)
+
+ @patch('os.kill')
+ @patch('os.path.exists')
+ @patch('helpers.get_pid_from_file')
+ def test_is_cassandra_running_missing_process(self, get_pid_from_file,
+ exists, kill):
+ # get_pid_from_file raises a ValueError if the pid is illegal.
+ get_pid_from_file.return_value = sentinel.pid_file
+ exists.return_value = True # The pid file is there
+ kill.side_effect = repeat(ProcessLookupError()) # The process isn't
+ self.assertFalse(helpers.is_cassandra_running())
+
+ @patch('os.kill')
+ @patch('os.path.exists')
+ @patch('helpers.get_pid_from_file')
+ def test_is_cassandra_running_wrong_user(self, get_pid_from_file,
+ exists, kill):
+ # get_pid_from_file raises a ValueError if the pid is illegal.
+ get_pid_from_file.return_value = sentinel.pid_file
+ exists.return_value = True # The pid file is there
+ kill.side_effect = repeat(PermissionError()) # But the process isn't
+ self.assertRaises(PermissionError, helpers.is_cassandra_running)
+
+ @patch('time.sleep')
+ @patch('os.kill')
+ @patch('helpers.get_pid_from_file')
+ @patch('subprocess.call')
+ def test_is_cassandra_running_starting_up(self, call, get_pid_from_file,
+ kill, sleep):
+ sleep.return_value = None # Don't actually sleep in unittests.
+ os.kill.return_value = True # There is a running pid.
+ get_pid_from_file.return_value = 42
+ subprocess.call.side_effect = iter([3, 2, 1, 0]) # 4th time the charm
+ self.assertTrue(helpers.is_cassandra_running())
+
+ @patch('helpers.backoff')
+ @patch('os.kill')
+ @patch('subprocess.call')
+ @patch('helpers.get_pid_from_file')
+ def test_is_cassandra_running_shutting_down(self, get_pid_from_file,
+ call, kill, backoff):
+ # If Cassandra is in the process of shutting down, it might take
+ # several failed checks before the pid file disappears.
+ backoff.return_value = repeat(True)
+ os.kill.return_value = None # The process is running
+ call.return_value = 1 # But nodetool is not succeeding.
+
+ # Fourth time, the pid file is gone.
+ get_pid_from_file.side_effect = iter([42, 42, 42,
+ FileNotFoundError('Whoops')])
+ self.assertFalse(helpers.is_cassandra_running())
+
+ @patch('os.kill')
+ @patch('subprocess.call')
+ @patch('os.path.exists')
+ @patch('helpers.get_pid_from_file')
+ def test_is_cassandra_running_failsafe(self, get_pid_from_file,
+ exists, subprocess_call, kill):
+ get_pid_from_file.return_value = sentinel.pid_file
+ exists.return_value = True # The pid file is there
+ subprocess_call.side_effect = repeat(RuntimeError('whoops'))
+ # Weird errors are reraised.
+ self.assertRaises(RuntimeError, helpers.is_cassandra_running)
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.query')
+ def test_get_auth_keyspace_replication(self, query, ver):
+ ver.return_value = '2.2'
+ query.return_value = [('{"json": true}',)]
+ settings = helpers.get_auth_keyspace_replication(sentinel.session)
+ self.assertDictEqual(settings, dict(json=True))
+ query.assert_called_once_with(
+ sentinel.session, dedent('''\
+ SELECT strategy_options FROM system.schema_keyspaces
+ WHERE keyspace_name='system_auth'
+ '''), ConsistencyLevel.QUORUM)
+
+ @patch('helpers.get_cassandra_version')
+ @patch('helpers.query')
+ def test_get_auth_keyspace_replication_30(self, query, ver):
+ ver.return_value = '3.0'
+ query.return_value = [({"json": True},)] # Decoded under 3.0
+ settings = helpers.get_auth_keyspace_replication(sentinel.session)
+ self.assertDictEqual(settings, dict(json=True))
+ query.assert_called_once_with(
+ sentinel.session, dedent('''\
+ SELECT replication FROM system_schema.keyspaces
+ WHERE keyspace_name='system_auth'
+ '''), ConsistencyLevel.QUORUM)
+
+ @patch('helpers.status_set')
+ @patch('charmhelpers.core.hookenv.status_get')
+ @patch('helpers.query')
+ def test_set_auth_keyspace_replication(self, query,
+ status_get, status_set):
+ status_get.return_value = ('active', '')
+ settings = dict(json=True)
+ helpers.set_auth_keyspace_replication(sentinel.session, settings)
+ query.assert_called_once_with(sentinel.session,
+ 'ALTER KEYSPACE system_auth '
+ 'WITH REPLICATION = %s',
+ ConsistencyLevel.ALL, (settings,))
+
+ @patch('helpers.status_set')
+ @patch('charmhelpers.core.hookenv.status_get')
+ @patch('helpers.nodetool')
+ def test_repair_auth_keyspace(self, nodetool, status_get, status_set):
+ status_get.return_value = (sentinel.status, '')
+ helpers.repair_auth_keyspace()
+ status_set.assert_called_once_with(sentinel.status,
+ 'Repairing system_auth keyspace')
+ # The repair operation may still fail, and I am currently regularly
+ # seeing 'snapshot creation' errors. Repair also takes ages with
+ # Cassandra 2.0. So retry until success, up to 1 hour.
+ nodetool.assert_called_once_with('repair', 'system_auth', timeout=3600)
+
+ def test_is_bootstrapped(self):
+ self.assertFalse(helpers.is_bootstrapped())
+ helpers.set_bootstrapped()
+ self.assertTrue(helpers.is_bootstrapped())
+
+ @patch('helpers.get_node_status')
+ def test_is_decommissioned(self, get_node_status):
+ get_node_status.return_value = 'DECOMMISSIONED'
+ self.assertTrue(helpers.is_decommissioned())
+ get_node_status.return_value = 'LEAVING'
+ self.assertTrue(helpers.is_decommissioned())
+ get_node_status.return_value = 'NORMAL'
+ self.assertFalse(helpers.is_decommissioned())
+
+ @patch('helpers.nodetool')
+ def test_emit_describe_cluster(self, nodetool):
+ helpers.emit_describe_cluster()
+ nodetool.assert_called_once_with('describecluster')
+
+ @patch('helpers.nodetool')
+ def test_emit_status(self, nodetool):
+ helpers.emit_status()
+ nodetool.assert_called_once_with('status')
+
+ @patch('helpers.nodetool')
+ def test_emit_netstats(self, nodetool):
+ helpers.emit_netstats()
+ nodetool.assert_called_once_with('netstats')
+
+ def test_week_spread(self):
+ # The first seven units run midnight on different days.
+ for i in range(0, 7): # There is no unit 0
+ with self.subTest(unit=i):
+ self.assertTupleEqual(helpers.week_spread(i), (i, 0, 0))
+
+ # The next seven units run midday on different days.
+ for i in range(7, 14):
+ with self.subTest(unit=i):
+ self.assertTupleEqual(helpers.week_spread(i), (i - 7, 12, 0))
+
+ # And the next seven units at 6 am on different days.
+ for i in range(14, 21):
+ with self.subTest(unit=i):
+ self.assertTupleEqual(helpers.week_spread(i), (i - 14, 6, 0))
+
+ # This keeps going as best we can, subdividing the hours.
+ self.assertTupleEqual(helpers.week_spread(811), (6, 19, 18))
+
+ # The granularity is 1 minute, so eventually we wrap after about
+ # 7000 units.
+ self.assertTupleEqual(helpers.week_spread(0), (0, 0, 0))
+ for i in range(1, 7168):
+ with self.subTest(unit=i):
+ self.assertNotEqual(helpers.week_spread(i), (0, 0, 0))
+ self.assertTupleEqual(helpers.week_spread(7168), (0, 0, 0))
+
+ def test_local_plugins_dir(self):
+ self.assertEqual(helpers.local_plugins_dir(),
+ '/usr/local/lib/nagios/plugins')
+
+ def test_update_hosts_file_new_entry(self):
+ org = dedent("""\
+ 127.0.0.1 localhost
+ 10.0.1.2 existing
+ """)
+ new = dedent("""\
+ 127.0.0.1 localhost
+ 10.0.1.2 existing
+ 10.0.1.3 newname
+ """)
+ with tempfile.NamedTemporaryFile(mode='w') as f:
+ f.write(org)
+ f.flush()
+ m = {'10.0.1.3': 'newname'}
+ helpers.update_hosts_file(f.name, m)
+ self.assertEqual(new.strip(), open(f.name, 'r').read().strip())
+
+ def test_update_hosts_file_changed_entry(self):
+ org = dedent("""\
+ 127.0.0.1 localhost
+ 10.0.1.2 existing
+ """)
+ new = dedent("""\
+ 127.0.0.1 localhost
+ 10.0.1.3 existing
+ """)
+ with tempfile.NamedTemporaryFile(mode='w') as f:
+ f.write(org)
+ f.flush()
+ m = {'10.0.1.3': 'existing'}
+ helpers.update_hosts_file(f.name, m)
+ self.assertEqual(new.strip(), open(f.name, 'r').read().strip())
+
+
+class TestIsLxc(unittest.TestCase):
+ def test_is_lxc(self):
+ # Test the function runs under the current environmnet.
+ # Unfortunately we can't sanely test that it is returning the
+ # correct value
+ helpers.is_lxc()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/charms/trusty/cassandra/tests/test_integration.py b/charms/trusty/cassandra/tests/test_integration.py
new file mode 100755
index 0000000..8d91bce
--- /dev/null
+++ b/charms/trusty/cassandra/tests/test_integration.py
@@ -0,0 +1,620 @@
+#!.venv3/bin/python3
+#
+# Copyright 2015 Canonical Ltd.
+#
+# This file is part of the Cassandra Charm for Juju.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranties of
+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import configparser
+from functools import wraps
+import glob
+import http.server
+from itertools import count
+import logging
+import multiprocessing
+import os
+import socket
+import subprocess
+import sys
+import time
+import unittest
+import uuid
+import warnings
+
+warnings.filterwarnings('ignore', 'The blist library is not available')
+
+import amulet.deployer
+import amulet.helpers
+from cassandra import Unavailable, ConsistencyLevel, AuthenticationFailed
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster, NoHostAvailable
+from cassandra.query import SimpleStatement
+import yaml
+
+import helpers
+from testing.amuletfixture import AmuletFixture
+
+
+SERIES = os.environ.get('SERIES', 'trusty')
+
+WAIT_TIMEOUT = int(os.environ.get('AMULET_TIMEOUT', 3600))
+
+ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
+
+
+class TestDeploymentBase(unittest.TestCase):
+ rf = 1
+ deployment = None
+
+ common_config = dict(max_heap_size='96M',
+ heap_newsize='4M')
+ test_config = dict()
+
+ @classmethod
+ def setUpClass(cls):
+ deployment = AmuletFixture(series=SERIES)
+ deployment.setUp()
+ cls.deployment = deployment
+
+ deployment.add('cassandra', units=cls.rf,
+ constraints=dict(mem="2G"))
+ deployment.expose('cassandra') # Tests need client access.
+ config = dict()
+ config.update(cls.common_config)
+ config.update(cls.test_config) # Test subclass overrides
+ deployment.configure('cassandra', config)
+
+ deployment.add('storage',
+ 'cs:~stub/{}/storage'.format(SERIES))
+ deployment.configure('storage', dict(provider='local'))
+
+ # A stub client charm.
+ empty_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir, 'lib',
+ 'testcharms', 'empty'))
+ deployment.add('client', empty_path)
+ deployment.relate('cassandra:database', 'client:database')
+ deployment.relate('cassandra:database-admin', 'client:database-admin')
+
+ # No official trusty branch of the nrpe-external-master charm, yet.
+ # This is a problem as it means tests may not be running against
+ # the lastest version.
+ deployment.add('nrpe',
+ 'cs:~stub/{}/nrpe-external-master'
+ ''.format(SERIES))
+ deployment.relate('cassandra:nrpe-external-master',
+ 'nrpe:nrpe-external-master')
+
+ deployment.deploy(timeout=WAIT_TIMEOUT)
+
+ # Silence noise - we are testing the charm, not the Cassandra
+ # driver.
+ cassandra_log = logging.getLogger('cassandra')
+ cassandra_log.setLevel(logging.CRITICAL)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.deployment.tearDown()
+ cls.deployment = None
+
+ def juju_status(self):
+ status_yaml = subprocess.check_output(['juju', 'status',
+ '--format=yaml'])
+ if not status_yaml.strip():
+ return None
+ return yaml.safe_load(status_yaml)
+
+ def cluster(self, username=None, password=None, hosts=None, port=9042):
+ status = self.juju_status()
+
+ if username is None or password is None:
+ # Get some valid credentials - unit's superuser account will do.
+ unit = sorted(status['services']['cassandra']['units'].keys())[0]
+ cqlshrc_path = helpers.get_cqlshrc_path()
+ cqlshrc = configparser.ConfigParser(interpolation=None)
+ cqlshrc.read_string(
+ self.deployment.sentry[unit].file_contents(cqlshrc_path))
+ username = cqlshrc['authentication']['username']
+ password = cqlshrc['authentication']['password']
+
+ auth_provider = PlainTextAuthProvider(username=username,
+ password=password)
+
+ if hosts is None:
+ # Get the IP addresses
+ hosts = []
+ for unit, d in status['services']['cassandra']['units'].items():
+ hosts.append(d['public-address'])
+ cluster = Cluster(hosts, auth_provider=auth_provider, port=port)
+ self.addCleanup(cluster.shutdown)
+ return cluster
+
+ def session(self):
+ '''A session using the server's superuser credentials.'''
+ session = self.cluster().connect()
+ self.addCleanup(session.shutdown)
+ return session
+
+ def client_session(self, relname):
+ '''A session using the client's credentials.
+
+ We currently just steal the client's credentials and use
+ them from the local machine, but we could tunnel through the
+ client with a little more effort.
+ '''
+ relinfo = self.get_client_relinfo(relname)
+ self.assertIn('host', relinfo.keys())
+ cluster = self.cluster(relinfo['username'],
+ relinfo['password'],
+ [relinfo['host']],
+ int(relinfo['native_transport_port']))
+ session = cluster.connect()
+ self.addCleanup(session.shutdown)
+ return session
+
+ keyspace_ids = count()
+
+ def new_keyspace(self, session, rf=None):
+ if rf is None:
+ # We create a keyspace with a replication factor equal
+ # to the number of units. This ensures that all records
+ # are replicated to all nodes, and we can cofirm that
+ # all nodes are working by doing an insert with
+ # ConsistencyLevel.ALL.
+ rf = self.rf
+ keyspace = 'test{}'.format(next(TestDeploymentBase.keyspace_ids))
+ q = SimpleStatement(
+ 'CREATE KEYSPACE {} WITH REPLICATION ='.format(keyspace) +
+ "{'class': 'SimpleStrategy', 'replication_factor': %s}",
+ consistency_level=ConsistencyLevel.ALL)
+ session.execute(q, (rf,))
+ session.set_keyspace(keyspace)
+ return keyspace
+
+ def get_client_relinfo(self, relname):
+ # We only need one unit, even if rf > 1
+ s = self.deployment.sentry['cassandra'][0]
+ relinfo = s.relation(relname, 'client:{}'.format(relname))
+ return relinfo
+
+ def is_port_open(self, port):
+ status = self.juju_status()
+ detail = list(status['services']['cassandra']['units'].values())[0]
+ address = detail['public-address']
+ rc = subprocess.call(['nc', '-z', '-w', '2', address, str(port)])
+ return rc == 0
+
+ def reconfigure_cassandra(self, **overrides):
+ config = dict()
+ config.update(self.common_config)
+ config.update(self.test_config)
+ config.update(overrides)
+ self.deployment.configure('cassandra', config)
+ self.deployment.wait()
+
+
+class Test1UnitDeployment(TestDeploymentBase):
+ """Tests run on both a single node cluster and a 3 node cluster."""
+ rf = 1
+ test_config = dict(jre='openjdk')
+
+ def test_basics_unit_superuser(self):
+ # Basic tests using unit superuser credentials
+ session = self.session()
+ self.new_keyspace(session)
+ self._test_database_basics(session)
+
+ def test_basics_client_relation(self):
+ # Create a keyspace using superuser credentials
+ super_session = self.session()
+ keyspace = self.new_keyspace(super_session)
+
+ # Basic tests using standard client relation credentials.
+ session = self.client_session('database')
+ session.set_keyspace(keyspace)
+ self._test_database_basics(session)
+
+ def test_basics_client_admin_relation(self):
+ # Basic tests using administrative client relation credentials.
+ session = self.client_session('database-admin')
+ self.new_keyspace(session)
+ self._test_database_basics(session)
+
+ def _test_database_basics(self, session):
+ session.execute('CREATE TABLE Foo (x varchar PRIMARY KEY)')
+
+ # Insert some data, ensuring that it has been stored on
+ # all of our juju units. Note that the replication factor
+ # of our keyspace has been set to the number of units we
+ # deployed. Because it might take a while for the cluster to get
+ # its act together, we retry this in a loop with a timeout.
+ timeout = time.time() + 120
+ while True:
+ value = 'hello {}'.format(time.time())
+ query = SimpleStatement(
+ "INSERT INTO Foo (x) VALUES (%s)",
+ consistency_level=ConsistencyLevel.ALL)
+ try:
+ session.execute(query, (value,))
+ break
+ except Exception:
+ if time.time() > timeout:
+ raise
+
+ # We can get the data out again. This isn't testing our charm,
+ # but nice to know anyway...
+ r = session.execute('SELECT * FROM Foo LIMIT 1')
+ self.assertTrue(r[0].x.startswith('hello'))
+
+ def test_external_mount(self):
+ # Not only does this test migrating data from local disk to an
+ # external mount, it also exercises the rolling restart logic.
+ # If rf==1, the restart will happen in the
+ # storage-relation-changed hook as soon as the mount is ready.
+ # If rf > 1, the restart will happen in the
+ # cluster-relation-changed hook once the unit has determined
+ # that it is its turn to restart.
+
+ # First, create a keyspace pre-migration so we can confirm the
+ # data was migrated rather than being reset to an empty system.
+ session = self.session()
+ keyspace = self.new_keyspace(session)
+ session.execute('CREATE TABLE dat (x varchar PRIMARY KEY)')
+ total = self.rf * 50
+ q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)')
+ for _ in range(0, total):
+ session.execute(q, (str(uuid.uuid1()),))
+ session.shutdown()
+
+ self.deployment.relate('cassandra:data', 'storage:data')
+ self.deployment.wait()
+ # Per Bug #1254766 and Bug #1254766, the sentry.wait() above
+ # will return before the hooks have actually finished running
+ # and data migrated. Instead, keep checking until our condition
+ # is met, or a timeout reached.
+ timeout = time.time() + 300
+ for s in self.deployment.sentry['cassandra']:
+ unit = s.info['unit_name']
+ unit_num = s.info['unit']
+ with self.subTest(unit=unit):
+ while True:
+ # Attempting to diagnose Amulet failures. I suspect
+ # SSH host keys again, per Bug #802117
+ try:
+ s.directory_contents('/')
+ except (subprocess.CalledProcessError, OSError):
+ self.skipTest('sentry[{!r}].directory_contents({!r}) '
+ 'failed!'.format(unit, '/'))
+ parents = ['/srv', '/srv/cassandra_{}'.format(unit_num),
+ '/srv/cassandra_{}/cassandra'.format(unit_num)]
+ for path in parents:
+ try:
+ s.directory_contents('/srv')
+ except (subprocess.CalledProcessError, OSError):
+ raise AssertionError('Failed to scan {!r} on {}'
+ .format(path, unit))
+ try:
+ contents = s.directory_contents(
+ '/srv/cassandra_{}/cassandra/data'.format(
+ unit_num))
+ found = set(contents['directories'])
+ self.assertIn(keyspace, found)
+ self.assertIn('system', found)
+ break
+ except Exception:
+ if time.time() > timeout:
+ raise
+ time.sleep(5)
+
+ # Confirm no data has been lost, which could happen if we badly
+ # shutdown and memtables were not flushed.
+ session = self.session()
+ session.set_keyspace(keyspace)
+ q = SimpleStatement('SELECT COUNT(*) FROM dat',
+ consistency_level=ConsistencyLevel.QUORUM)
+ results = session.execute(q)
+ self.assertEqual(results[0][0], total)
+
+ def test_cluster_ports_closed(self):
+ # The internal Cassandra ports are protected by ufw firewall
+ # rules, and are closed to everyone except for peers and the
+ # force_seed_nodes list. This is required for security, since
+ # the protocols are unauthenticated. It also stops rogue nodes
+ # on failed units from rejoining the cluster and causing chaos.
+ self.assertFalse(self.is_port_open(7000), 'Storage port open')
+ self.assertFalse(self.is_port_open(7001), 'SSL Storage port open')
+ self.assertFalse(self.is_port_open(7199), 'JMX port open')
+
+ def test_client_ports_open(self):
+ self.assertTrue(self.is_port_open(9042), 'Native trans port closed')
+ self.assertTrue(self.is_port_open(9160), 'Thrift RPC port closed')
+
+ def test_default_superuser_account_closed(self):
+ cluster = self.cluster(username='cassandra', password='cassandra')
+ try:
+ cluster.connect()
+ self.fail('Default credentials not reset')
+ except NoHostAvailable as x:
+ for fail in x.errors.values():
+ self.assertIsInstance(fail, AuthenticationFailed)
+
+ def test_cqlsh(self):
+ unit = self.deployment.sentry['cassandra'][0].info['unit_name']
+ subprocess.check_output(['juju', 'ssh', unit,
+ 'sudo -H cqlsh -e exit'],
+ stderr=subprocess.STDOUT)
+
+ def test_z_add_and_drop_node(self): # 'z' to run this test last.
+ # We need to be able to add a node correctly into the ring,
+ # without an operator needing to repair keyspaces to ensure data
+ # is located on the expected nodes.
+ # To test this, first create a keyspace with rf==1 and put enough
+ # data in it so each node will have some.
+ cluster = self.cluster()
+ s = cluster.connect()
+ keyspace = self.new_keyspace(s, rf=1)
+ q = SimpleStatement('CREATE TABLE dat (x varchar PRIMARY KEY)',
+ consistency_level=ConsistencyLevel.ALL)
+ s.execute(q)
+
+ total = self.rf * 50
+ q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)',
+ consistency_level=ConsistencyLevel.QUORUM)
+ for _ in range(0, total):
+ s.execute(q, (str(uuid.uuid1()),))
+ cluster.shutdown()
+
+ def count(expected):
+ until = time.time() + 180
+ while True:
+ cluster = self.cluster()
+ try:
+ s = cluster.connect(keyspace)
+ results = s.execute(SimpleStatement(
+ 'SELECT count(*) FROM dat',
+ consistency_level=ConsistencyLevel.QUORUM))
+ found = results[0][0]
+ if found == expected or time.time() > until:
+ return found
+ time.sleep(0.2)
+ except Unavailable:
+ if time.time() > until:
+ raise
+ finally:
+ cluster.shutdown()
+
+ self.assertEqual(count(total), total)
+
+ self.deployment.add_unit('cassandra')
+ self.deployment.wait()
+ status = self.juju_status()
+ unit = sorted(status['services']['cassandra']['units'].keys())[-1]
+ try:
+ self.assertEqual(count(total), total)
+ finally:
+ # When a node is dropped, it needs to decommission itself and
+ # move its data to the remaining nodes so no data is lost.
+ # Alas, per Bug #1417874 we can't yet do this with Juju.
+ # First, the node must be manually decommissioned before we
+ # remove the unit.
+ self._decommission(unit)
+ self.deployment.remove_unit(unit)
+ self.deployment.wait()
+
+ self.assertEqual(count(total), total)
+
+ def _decommission(self, unit):
+ until = time.time() + WAIT_TIMEOUT
+ while True:
+ try:
+ subprocess.check_output(['juju', 'run', '--unit', unit,
+ 'nodetool decommission'],
+ stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ break
+ except subprocess.CalledProcessError:
+ if time.time() > until:
+ raise
+
+ until = time.time() + WAIT_TIMEOUT
+ while True:
+ try:
+ cmd = ['juju', 'run', '--unit', unit, 'nodetool netstats']
+ raw = subprocess.check_output(cmd, stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ if 'Mode: DECOMMISSIONED' in raw:
+ return
+ if time.time() > until:
+ raise subprocess.TimeoutExpired(cmd, WAIT_TIMEOUT, raw)
+ except subprocess.CalledProcessError:
+ if time.time() > until:
+ raise
+ time.sleep(3)
+
+
+class Test3UnitDeployment(Test1UnitDeployment):
+ """Tests run on a three node cluster."""
+ rf = 3
+
+
+_jre_url = None
+
+
+def _serve(cwd, host, port):
+ sys.stderr = open('/dev/null', 'w')
+ os.chdir(cwd)
+ httpd = http.server.HTTPServer((host, port),
+ http.server.SimpleHTTPRequestHandler)
+ httpd.serve_forever()
+
+
+_procs = []
+
+
+def get_jre_url():
+ '''Return the URL to the Oracle Java SE 8 Server Runtime tarball, or None.
+
+ The tarball needs to be placed in ../lib.
+
+ Spawns a web server as a subprocess to serve the file.
+ '''
+ global _jre_url
+ if _jre_url is not None:
+ return _jre_url
+
+ jre_dir = os.path.join(ROOT, 'lib')
+
+ jre_tarballs = sorted(glob.glob(os.path.join(jre_dir,
+ 'server-jre-?u*.tar.gz')))
+ if not jre_tarballs:
+ return None
+
+ # Get the local IP address, only available via hackish means and
+ # quite possibly incorrect.
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect(('www.canonical.com', 80))
+ host = s.getsockname()[0]
+ s.close()
+
+ # Get a free port.
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.bind((host, 0))
+ port = s.getsockname()[1]
+ s.close()
+
+ p = multiprocessing.Process(target=_serve, args=(jre_dir, host, port),
+ daemon=True)
+ p.start()
+ _procs.append(p)
+
+ _jre_url = 'http://{}:{}/{}'.format(host, port,
+ os.path.basename(jre_tarballs[-1]))
+ return _jre_url
+
+
+class TestOracleJREDeployment(Test1UnitDeployment):
+ """Basic test with the Oracle JRE.
+
+ Unfortunately these tests cannot be run by the automatic test runners,
+ as the Oracle JRE is protected from public download by Oracle's
+ click-through license agreement.
+ """
+ rf = 1
+ test_config = dict(jre='Oracle', edition='community',
+ private_jre_url=get_jre_url())
+
+ @classmethod
+ @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available')
+ def setUpClass(cls):
+ super(TestOracleJREDeployment, cls).setUpClass()
+
+
+class TestDSEDeployment(Test1UnitDeployment):
+ """Tests run a single node DataStax Enterprise cluster.
+
+ Unfortunately these tests cannot be run by the automatic test
+ runners, as the DSE packages are not available for public download.
+ """
+ rf = 1
+ test_config = dict(
+ edition='DSE', # Forces Oracle JRE
+ install_sources=yaml.safe_dump([os.environ.get('DSE_SOURCE'),
+ 'ppa:stub/cassandra']),
+ install_keys=yaml.safe_dump([None, None]),
+ private_jre_url=get_jre_url())
+
+ @classmethod
+ @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available')
+ @unittest.skipIf('DSE_SOURCE' not in os.environ,
+ 'DSE_SOURCE environment variable not configured')
+ def setUpClass(cls):
+ super(TestDSEDeployment, cls).setUpClass()
+
+
+class TestAllowAllAuthenticatorDeployment(Test3UnitDeployment):
+ test_config = dict(authenticator='AllowAllAuthenticator')
+
+ def cluster(self, username=None, password=None, hosts=None, port=9042):
+ '''A cluster using invalid credentials.'''
+ return super(TestAllowAllAuthenticatorDeployment,
+ self).cluster(username='wat', password='eva')
+
+ def client_session(self, relname):
+ '''A session using invalid credentials.'''
+ relinfo = self.get_client_relinfo(relname)
+ self.assertIn('host', relinfo.keys())
+ cluster = self.cluster('random', 'nonsense',
+ [relinfo['host']],
+ int(relinfo['native_transport_port']))
+ session = cluster.connect()
+ self.addCleanup(session.shutdown)
+ return session
+
+ test_default_superuser_account_closed = None
+
+
+class Test20Deployment(Test1UnitDeployment):
+ """Tests run on a single node Apache Cassandra 2.0 cluster.
+ """
+ rf = 1
+ test_config = dict(
+ edition='community',
+ install_sources=yaml.safe_dump([
+ 'ppa:stub/cassandra',
+ 'ppa:openjdk-r/ppa',
+ 'deb http://www.apache.org/dist/cassandra/debian 20x main']),
+ install_keys=yaml.safe_dump([None, None, None]))
+
+
+class Test21Deployment(Test1UnitDeployment):
+ """Tests run on a single node Apache Cassandra 2.1 cluster.
+ """
+ rf = 1
+ test_config = dict(
+ edition='community',
+ install_sources=yaml.safe_dump([
+ 'ppa:stub/cassandra',
+ 'ppa:openjdk-r/ppa',
+ 'deb http://www.apache.org/dist/cassandra/debian 21x main']),
+ install_keys=yaml.safe_dump([None, None, None]))
+
+
+class Test30Deployment(Test1UnitDeployment):
+ """Tests run on a single node Apache Cassandra 3.0 cluster.
+ """
+ rf = 1
+ test_config = dict(
+ edition='community',
+ install_sources=yaml.safe_dump([
+ 'ppa:stub/cassandra',
+ 'ppa:openjdk-r/ppa',
+ 'deb http://www.apache.org/dist/cassandra/debian 30x main']),
+ install_keys=yaml.safe_dump([None, None, None]))
+
+
+# Bug #1417097 means we need to monkey patch Amulet for now.
+real_juju = amulet.helpers.juju
+
+
+@wraps(real_juju)
+def patched_juju(args, env=None):
+ args = [str(a) for a in args]
+ return real_juju(args, env)
+
+amulet.helpers.juju = patched_juju
+amulet.deployer.juju = patched_juju
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/charms/trusty/cassandra/tests/tests.yaml b/charms/trusty/cassandra/tests/tests.yaml
new file mode 100644
index 0000000..fbbd7f0
--- /dev/null
+++ b/charms/trusty/cassandra/tests/tests.yaml
@@ -0,0 +1,15 @@
+bootstrap: true
+reset: false
+tests: ""
+virtualenv: false
+# sources: []
+# packages: []
+makefile:
+ - lint
+ - unittest
+ - Test1UnitDeployment
+ - Test3UnitDeployment
+ - Test20Deployment
+ - Test21Deployment
+ - Test30Deployment
+ - TestAllowAllAuthenticatorDeployment