diff options
Diffstat (limited to 'charms/trusty/cassandra/tests/test_integration.py')
-rwxr-xr-x | charms/trusty/cassandra/tests/test_integration.py | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/charms/trusty/cassandra/tests/test_integration.py b/charms/trusty/cassandra/tests/test_integration.py new file mode 100755 index 0000000..8d91bce --- /dev/null +++ b/charms/trusty/cassandra/tests/test_integration.py @@ -0,0 +1,620 @@ +#!.venv3/bin/python3 +# +# Copyright 2015 Canonical Ltd. +# +# This file is part of the Cassandra Charm for Juju. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranties of +# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import configparser +from functools import wraps +import glob +import http.server +from itertools import count +import logging +import multiprocessing +import os +import socket +import subprocess +import sys +import time +import unittest +import uuid +import warnings + +warnings.filterwarnings('ignore', 'The blist library is not available') + +import amulet.deployer +import amulet.helpers +from cassandra import Unavailable, ConsistencyLevel, AuthenticationFailed +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster, NoHostAvailable +from cassandra.query import SimpleStatement +import yaml + +import helpers +from testing.amuletfixture import AmuletFixture + + +SERIES = os.environ.get('SERIES', 'trusty') + +WAIT_TIMEOUT = int(os.environ.get('AMULET_TIMEOUT', 3600)) + +ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) + + +class TestDeploymentBase(unittest.TestCase): + rf = 1 + deployment = None + + common_config = dict(max_heap_size='96M', + heap_newsize='4M') + test_config = dict() + + @classmethod + def setUpClass(cls): + deployment = AmuletFixture(series=SERIES) + deployment.setUp() + cls.deployment = deployment + + deployment.add('cassandra', units=cls.rf, + constraints=dict(mem="2G")) + deployment.expose('cassandra') # Tests need client access. + config = dict() + config.update(cls.common_config) + config.update(cls.test_config) # Test subclass overrides + deployment.configure('cassandra', config) + + deployment.add('storage', + 'cs:~stub/{}/storage'.format(SERIES)) + deployment.configure('storage', dict(provider='local')) + + # A stub client charm. + empty_path = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, 'lib', + 'testcharms', 'empty')) + deployment.add('client', empty_path) + deployment.relate('cassandra:database', 'client:database') + deployment.relate('cassandra:database-admin', 'client:database-admin') + + # No official trusty branch of the nrpe-external-master charm, yet. + # This is a problem as it means tests may not be running against + # the lastest version. + deployment.add('nrpe', + 'cs:~stub/{}/nrpe-external-master' + ''.format(SERIES)) + deployment.relate('cassandra:nrpe-external-master', + 'nrpe:nrpe-external-master') + + deployment.deploy(timeout=WAIT_TIMEOUT) + + # Silence noise - we are testing the charm, not the Cassandra + # driver. + cassandra_log = logging.getLogger('cassandra') + cassandra_log.setLevel(logging.CRITICAL) + + @classmethod + def tearDownClass(cls): + cls.deployment.tearDown() + cls.deployment = None + + def juju_status(self): + status_yaml = subprocess.check_output(['juju', 'status', + '--format=yaml']) + if not status_yaml.strip(): + return None + return yaml.safe_load(status_yaml) + + def cluster(self, username=None, password=None, hosts=None, port=9042): + status = self.juju_status() + + if username is None or password is None: + # Get some valid credentials - unit's superuser account will do. + unit = sorted(status['services']['cassandra']['units'].keys())[0] + cqlshrc_path = helpers.get_cqlshrc_path() + cqlshrc = configparser.ConfigParser(interpolation=None) + cqlshrc.read_string( + self.deployment.sentry[unit].file_contents(cqlshrc_path)) + username = cqlshrc['authentication']['username'] + password = cqlshrc['authentication']['password'] + + auth_provider = PlainTextAuthProvider(username=username, + password=password) + + if hosts is None: + # Get the IP addresses + hosts = [] + for unit, d in status['services']['cassandra']['units'].items(): + hosts.append(d['public-address']) + cluster = Cluster(hosts, auth_provider=auth_provider, port=port) + self.addCleanup(cluster.shutdown) + return cluster + + def session(self): + '''A session using the server's superuser credentials.''' + session = self.cluster().connect() + self.addCleanup(session.shutdown) + return session + + def client_session(self, relname): + '''A session using the client's credentials. + + We currently just steal the client's credentials and use + them from the local machine, but we could tunnel through the + client with a little more effort. + ''' + relinfo = self.get_client_relinfo(relname) + self.assertIn('host', relinfo.keys()) + cluster = self.cluster(relinfo['username'], + relinfo['password'], + [relinfo['host']], + int(relinfo['native_transport_port'])) + session = cluster.connect() + self.addCleanup(session.shutdown) + return session + + keyspace_ids = count() + + def new_keyspace(self, session, rf=None): + if rf is None: + # We create a keyspace with a replication factor equal + # to the number of units. This ensures that all records + # are replicated to all nodes, and we can cofirm that + # all nodes are working by doing an insert with + # ConsistencyLevel.ALL. + rf = self.rf + keyspace = 'test{}'.format(next(TestDeploymentBase.keyspace_ids)) + q = SimpleStatement( + 'CREATE KEYSPACE {} WITH REPLICATION ='.format(keyspace) + + "{'class': 'SimpleStrategy', 'replication_factor': %s}", + consistency_level=ConsistencyLevel.ALL) + session.execute(q, (rf,)) + session.set_keyspace(keyspace) + return keyspace + + def get_client_relinfo(self, relname): + # We only need one unit, even if rf > 1 + s = self.deployment.sentry['cassandra'][0] + relinfo = s.relation(relname, 'client:{}'.format(relname)) + return relinfo + + def is_port_open(self, port): + status = self.juju_status() + detail = list(status['services']['cassandra']['units'].values())[0] + address = detail['public-address'] + rc = subprocess.call(['nc', '-z', '-w', '2', address, str(port)]) + return rc == 0 + + def reconfigure_cassandra(self, **overrides): + config = dict() + config.update(self.common_config) + config.update(self.test_config) + config.update(overrides) + self.deployment.configure('cassandra', config) + self.deployment.wait() + + +class Test1UnitDeployment(TestDeploymentBase): + """Tests run on both a single node cluster and a 3 node cluster.""" + rf = 1 + test_config = dict(jre='openjdk') + + def test_basics_unit_superuser(self): + # Basic tests using unit superuser credentials + session = self.session() + self.new_keyspace(session) + self._test_database_basics(session) + + def test_basics_client_relation(self): + # Create a keyspace using superuser credentials + super_session = self.session() + keyspace = self.new_keyspace(super_session) + + # Basic tests using standard client relation credentials. + session = self.client_session('database') + session.set_keyspace(keyspace) + self._test_database_basics(session) + + def test_basics_client_admin_relation(self): + # Basic tests using administrative client relation credentials. + session = self.client_session('database-admin') + self.new_keyspace(session) + self._test_database_basics(session) + + def _test_database_basics(self, session): + session.execute('CREATE TABLE Foo (x varchar PRIMARY KEY)') + + # Insert some data, ensuring that it has been stored on + # all of our juju units. Note that the replication factor + # of our keyspace has been set to the number of units we + # deployed. Because it might take a while for the cluster to get + # its act together, we retry this in a loop with a timeout. + timeout = time.time() + 120 + while True: + value = 'hello {}'.format(time.time()) + query = SimpleStatement( + "INSERT INTO Foo (x) VALUES (%s)", + consistency_level=ConsistencyLevel.ALL) + try: + session.execute(query, (value,)) + break + except Exception: + if time.time() > timeout: + raise + + # We can get the data out again. This isn't testing our charm, + # but nice to know anyway... + r = session.execute('SELECT * FROM Foo LIMIT 1') + self.assertTrue(r[0].x.startswith('hello')) + + def test_external_mount(self): + # Not only does this test migrating data from local disk to an + # external mount, it also exercises the rolling restart logic. + # If rf==1, the restart will happen in the + # storage-relation-changed hook as soon as the mount is ready. + # If rf > 1, the restart will happen in the + # cluster-relation-changed hook once the unit has determined + # that it is its turn to restart. + + # First, create a keyspace pre-migration so we can confirm the + # data was migrated rather than being reset to an empty system. + session = self.session() + keyspace = self.new_keyspace(session) + session.execute('CREATE TABLE dat (x varchar PRIMARY KEY)') + total = self.rf * 50 + q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)') + for _ in range(0, total): + session.execute(q, (str(uuid.uuid1()),)) + session.shutdown() + + self.deployment.relate('cassandra:data', 'storage:data') + self.deployment.wait() + # Per Bug #1254766 and Bug #1254766, the sentry.wait() above + # will return before the hooks have actually finished running + # and data migrated. Instead, keep checking until our condition + # is met, or a timeout reached. + timeout = time.time() + 300 + for s in self.deployment.sentry['cassandra']: + unit = s.info['unit_name'] + unit_num = s.info['unit'] + with self.subTest(unit=unit): + while True: + # Attempting to diagnose Amulet failures. I suspect + # SSH host keys again, per Bug #802117 + try: + s.directory_contents('/') + except (subprocess.CalledProcessError, OSError): + self.skipTest('sentry[{!r}].directory_contents({!r}) ' + 'failed!'.format(unit, '/')) + parents = ['/srv', '/srv/cassandra_{}'.format(unit_num), + '/srv/cassandra_{}/cassandra'.format(unit_num)] + for path in parents: + try: + s.directory_contents('/srv') + except (subprocess.CalledProcessError, OSError): + raise AssertionError('Failed to scan {!r} on {}' + .format(path, unit)) + try: + contents = s.directory_contents( + '/srv/cassandra_{}/cassandra/data'.format( + unit_num)) + found = set(contents['directories']) + self.assertIn(keyspace, found) + self.assertIn('system', found) + break + except Exception: + if time.time() > timeout: + raise + time.sleep(5) + + # Confirm no data has been lost, which could happen if we badly + # shutdown and memtables were not flushed. + session = self.session() + session.set_keyspace(keyspace) + q = SimpleStatement('SELECT COUNT(*) FROM dat', + consistency_level=ConsistencyLevel.QUORUM) + results = session.execute(q) + self.assertEqual(results[0][0], total) + + def test_cluster_ports_closed(self): + # The internal Cassandra ports are protected by ufw firewall + # rules, and are closed to everyone except for peers and the + # force_seed_nodes list. This is required for security, since + # the protocols are unauthenticated. It also stops rogue nodes + # on failed units from rejoining the cluster and causing chaos. + self.assertFalse(self.is_port_open(7000), 'Storage port open') + self.assertFalse(self.is_port_open(7001), 'SSL Storage port open') + self.assertFalse(self.is_port_open(7199), 'JMX port open') + + def test_client_ports_open(self): + self.assertTrue(self.is_port_open(9042), 'Native trans port closed') + self.assertTrue(self.is_port_open(9160), 'Thrift RPC port closed') + + def test_default_superuser_account_closed(self): + cluster = self.cluster(username='cassandra', password='cassandra') + try: + cluster.connect() + self.fail('Default credentials not reset') + except NoHostAvailable as x: + for fail in x.errors.values(): + self.assertIsInstance(fail, AuthenticationFailed) + + def test_cqlsh(self): + unit = self.deployment.sentry['cassandra'][0].info['unit_name'] + subprocess.check_output(['juju', 'ssh', unit, + 'sudo -H cqlsh -e exit'], + stderr=subprocess.STDOUT) + + def test_z_add_and_drop_node(self): # 'z' to run this test last. + # We need to be able to add a node correctly into the ring, + # without an operator needing to repair keyspaces to ensure data + # is located on the expected nodes. + # To test this, first create a keyspace with rf==1 and put enough + # data in it so each node will have some. + cluster = self.cluster() + s = cluster.connect() + keyspace = self.new_keyspace(s, rf=1) + q = SimpleStatement('CREATE TABLE dat (x varchar PRIMARY KEY)', + consistency_level=ConsistencyLevel.ALL) + s.execute(q) + + total = self.rf * 50 + q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)', + consistency_level=ConsistencyLevel.QUORUM) + for _ in range(0, total): + s.execute(q, (str(uuid.uuid1()),)) + cluster.shutdown() + + def count(expected): + until = time.time() + 180 + while True: + cluster = self.cluster() + try: + s = cluster.connect(keyspace) + results = s.execute(SimpleStatement( + 'SELECT count(*) FROM dat', + consistency_level=ConsistencyLevel.QUORUM)) + found = results[0][0] + if found == expected or time.time() > until: + return found + time.sleep(0.2) + except Unavailable: + if time.time() > until: + raise + finally: + cluster.shutdown() + + self.assertEqual(count(total), total) + + self.deployment.add_unit('cassandra') + self.deployment.wait() + status = self.juju_status() + unit = sorted(status['services']['cassandra']['units'].keys())[-1] + try: + self.assertEqual(count(total), total) + finally: + # When a node is dropped, it needs to decommission itself and + # move its data to the remaining nodes so no data is lost. + # Alas, per Bug #1417874 we can't yet do this with Juju. + # First, the node must be manually decommissioned before we + # remove the unit. + self._decommission(unit) + self.deployment.remove_unit(unit) + self.deployment.wait() + + self.assertEqual(count(total), total) + + def _decommission(self, unit): + until = time.time() + WAIT_TIMEOUT + while True: + try: + subprocess.check_output(['juju', 'run', '--unit', unit, + 'nodetool decommission'], + stderr=subprocess.STDOUT, + universal_newlines=True) + break + except subprocess.CalledProcessError: + if time.time() > until: + raise + + until = time.time() + WAIT_TIMEOUT + while True: + try: + cmd = ['juju', 'run', '--unit', unit, 'nodetool netstats'] + raw = subprocess.check_output(cmd, stderr=subprocess.STDOUT, + universal_newlines=True) + if 'Mode: DECOMMISSIONED' in raw: + return + if time.time() > until: + raise subprocess.TimeoutExpired(cmd, WAIT_TIMEOUT, raw) + except subprocess.CalledProcessError: + if time.time() > until: + raise + time.sleep(3) + + +class Test3UnitDeployment(Test1UnitDeployment): + """Tests run on a three node cluster.""" + rf = 3 + + +_jre_url = None + + +def _serve(cwd, host, port): + sys.stderr = open('/dev/null', 'w') + os.chdir(cwd) + httpd = http.server.HTTPServer((host, port), + http.server.SimpleHTTPRequestHandler) + httpd.serve_forever() + + +_procs = [] + + +def get_jre_url(): + '''Return the URL to the Oracle Java SE 8 Server Runtime tarball, or None. + + The tarball needs to be placed in ../lib. + + Spawns a web server as a subprocess to serve the file. + ''' + global _jre_url + if _jre_url is not None: + return _jre_url + + jre_dir = os.path.join(ROOT, 'lib') + + jre_tarballs = sorted(glob.glob(os.path.join(jre_dir, + 'server-jre-?u*.tar.gz'))) + if not jre_tarballs: + return None + + # Get the local IP address, only available via hackish means and + # quite possibly incorrect. + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('www.canonical.com', 80)) + host = s.getsockname()[0] + s.close() + + # Get a free port. + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind((host, 0)) + port = s.getsockname()[1] + s.close() + + p = multiprocessing.Process(target=_serve, args=(jre_dir, host, port), + daemon=True) + p.start() + _procs.append(p) + + _jre_url = 'http://{}:{}/{}'.format(host, port, + os.path.basename(jre_tarballs[-1])) + return _jre_url + + +class TestOracleJREDeployment(Test1UnitDeployment): + """Basic test with the Oracle JRE. + + Unfortunately these tests cannot be run by the automatic test runners, + as the Oracle JRE is protected from public download by Oracle's + click-through license agreement. + """ + rf = 1 + test_config = dict(jre='Oracle', edition='community', + private_jre_url=get_jre_url()) + + @classmethod + @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available') + def setUpClass(cls): + super(TestOracleJREDeployment, cls).setUpClass() + + +class TestDSEDeployment(Test1UnitDeployment): + """Tests run a single node DataStax Enterprise cluster. + + Unfortunately these tests cannot be run by the automatic test + runners, as the DSE packages are not available for public download. + """ + rf = 1 + test_config = dict( + edition='DSE', # Forces Oracle JRE + install_sources=yaml.safe_dump([os.environ.get('DSE_SOURCE'), + 'ppa:stub/cassandra']), + install_keys=yaml.safe_dump([None, None]), + private_jre_url=get_jre_url()) + + @classmethod + @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available') + @unittest.skipIf('DSE_SOURCE' not in os.environ, + 'DSE_SOURCE environment variable not configured') + def setUpClass(cls): + super(TestDSEDeployment, cls).setUpClass() + + +class TestAllowAllAuthenticatorDeployment(Test3UnitDeployment): + test_config = dict(authenticator='AllowAllAuthenticator') + + def cluster(self, username=None, password=None, hosts=None, port=9042): + '''A cluster using invalid credentials.''' + return super(TestAllowAllAuthenticatorDeployment, + self).cluster(username='wat', password='eva') + + def client_session(self, relname): + '''A session using invalid credentials.''' + relinfo = self.get_client_relinfo(relname) + self.assertIn('host', relinfo.keys()) + cluster = self.cluster('random', 'nonsense', + [relinfo['host']], + int(relinfo['native_transport_port'])) + session = cluster.connect() + self.addCleanup(session.shutdown) + return session + + test_default_superuser_account_closed = None + + +class Test20Deployment(Test1UnitDeployment): + """Tests run on a single node Apache Cassandra 2.0 cluster. + """ + rf = 1 + test_config = dict( + edition='community', + install_sources=yaml.safe_dump([ + 'ppa:stub/cassandra', + 'ppa:openjdk-r/ppa', + 'deb http://www.apache.org/dist/cassandra/debian 20x main']), + install_keys=yaml.safe_dump([None, None, None])) + + +class Test21Deployment(Test1UnitDeployment): + """Tests run on a single node Apache Cassandra 2.1 cluster. + """ + rf = 1 + test_config = dict( + edition='community', + install_sources=yaml.safe_dump([ + 'ppa:stub/cassandra', + 'ppa:openjdk-r/ppa', + 'deb http://www.apache.org/dist/cassandra/debian 21x main']), + install_keys=yaml.safe_dump([None, None, None])) + + +class Test30Deployment(Test1UnitDeployment): + """Tests run on a single node Apache Cassandra 3.0 cluster. + """ + rf = 1 + test_config = dict( + edition='community', + install_sources=yaml.safe_dump([ + 'ppa:stub/cassandra', + 'ppa:openjdk-r/ppa', + 'deb http://www.apache.org/dist/cassandra/debian 30x main']), + install_keys=yaml.safe_dump([None, None, None])) + + +# Bug #1417097 means we need to monkey patch Amulet for now. +real_juju = amulet.helpers.juju + + +@wraps(real_juju) +def patched_juju(args, env=None): + args = [str(a) for a in args] + return real_juju(args, env) + +amulet.helpers.juju = patched_juju +amulet.deployer.juju = patched_juju + + +if __name__ == '__main__': + unittest.main(verbosity=2) |