diff options
Diffstat (limited to 'charms/trusty/cassandra/tests/test_integration.py')
-rwxr-xr-x | charms/trusty/cassandra/tests/test_integration.py | 620 |
1 files changed, 0 insertions, 620 deletions
diff --git a/charms/trusty/cassandra/tests/test_integration.py b/charms/trusty/cassandra/tests/test_integration.py deleted file mode 100755 index 8d91bce..0000000 --- a/charms/trusty/cassandra/tests/test_integration.py +++ /dev/null @@ -1,620 +0,0 @@ -#!.venv3/bin/python3 -# -# Copyright 2015 Canonical Ltd. -# -# This file is part of the Cassandra Charm for Juju. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranties of -# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR -# PURPOSE. See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import configparser -from functools import wraps -import glob -import http.server -from itertools import count -import logging -import multiprocessing -import os -import socket -import subprocess -import sys -import time -import unittest -import uuid -import warnings - -warnings.filterwarnings('ignore', 'The blist library is not available') - -import amulet.deployer -import amulet.helpers -from cassandra import Unavailable, ConsistencyLevel, AuthenticationFailed -from cassandra.auth import PlainTextAuthProvider -from cassandra.cluster import Cluster, NoHostAvailable -from cassandra.query import SimpleStatement -import yaml - -import helpers -from testing.amuletfixture import AmuletFixture - - -SERIES = os.environ.get('SERIES', 'trusty') - -WAIT_TIMEOUT = int(os.environ.get('AMULET_TIMEOUT', 3600)) - -ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) - - -class TestDeploymentBase(unittest.TestCase): - rf = 1 - deployment = None - - common_config = dict(max_heap_size='96M', - heap_newsize='4M') - test_config = dict() - - @classmethod - def setUpClass(cls): - deployment = AmuletFixture(series=SERIES) - deployment.setUp() - cls.deployment = deployment - - deployment.add('cassandra', units=cls.rf, - constraints=dict(mem="2G")) - deployment.expose('cassandra') # Tests need client access. - config = dict() - config.update(cls.common_config) - config.update(cls.test_config) # Test subclass overrides - deployment.configure('cassandra', config) - - deployment.add('storage', - 'cs:~stub/{}/storage'.format(SERIES)) - deployment.configure('storage', dict(provider='local')) - - # A stub client charm. - empty_path = os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir, 'lib', - 'testcharms', 'empty')) - deployment.add('client', empty_path) - deployment.relate('cassandra:database', 'client:database') - deployment.relate('cassandra:database-admin', 'client:database-admin') - - # No official trusty branch of the nrpe-external-master charm, yet. - # This is a problem as it means tests may not be running against - # the lastest version. - deployment.add('nrpe', - 'cs:~stub/{}/nrpe-external-master' - ''.format(SERIES)) - deployment.relate('cassandra:nrpe-external-master', - 'nrpe:nrpe-external-master') - - deployment.deploy(timeout=WAIT_TIMEOUT) - - # Silence noise - we are testing the charm, not the Cassandra - # driver. - cassandra_log = logging.getLogger('cassandra') - cassandra_log.setLevel(logging.CRITICAL) - - @classmethod - def tearDownClass(cls): - cls.deployment.tearDown() - cls.deployment = None - - def juju_status(self): - status_yaml = subprocess.check_output(['juju', 'status', - '--format=yaml']) - if not status_yaml.strip(): - return None - return yaml.safe_load(status_yaml) - - def cluster(self, username=None, password=None, hosts=None, port=9042): - status = self.juju_status() - - if username is None or password is None: - # Get some valid credentials - unit's superuser account will do. - unit = sorted(status['services']['cassandra']['units'].keys())[0] - cqlshrc_path = helpers.get_cqlshrc_path() - cqlshrc = configparser.ConfigParser(interpolation=None) - cqlshrc.read_string( - self.deployment.sentry[unit].file_contents(cqlshrc_path)) - username = cqlshrc['authentication']['username'] - password = cqlshrc['authentication']['password'] - - auth_provider = PlainTextAuthProvider(username=username, - password=password) - - if hosts is None: - # Get the IP addresses - hosts = [] - for unit, d in status['services']['cassandra']['units'].items(): - hosts.append(d['public-address']) - cluster = Cluster(hosts, auth_provider=auth_provider, port=port) - self.addCleanup(cluster.shutdown) - return cluster - - def session(self): - '''A session using the server's superuser credentials.''' - session = self.cluster().connect() - self.addCleanup(session.shutdown) - return session - - def client_session(self, relname): - '''A session using the client's credentials. - - We currently just steal the client's credentials and use - them from the local machine, but we could tunnel through the - client with a little more effort. - ''' - relinfo = self.get_client_relinfo(relname) - self.assertIn('host', relinfo.keys()) - cluster = self.cluster(relinfo['username'], - relinfo['password'], - [relinfo['host']], - int(relinfo['native_transport_port'])) - session = cluster.connect() - self.addCleanup(session.shutdown) - return session - - keyspace_ids = count() - - def new_keyspace(self, session, rf=None): - if rf is None: - # We create a keyspace with a replication factor equal - # to the number of units. This ensures that all records - # are replicated to all nodes, and we can cofirm that - # all nodes are working by doing an insert with - # ConsistencyLevel.ALL. - rf = self.rf - keyspace = 'test{}'.format(next(TestDeploymentBase.keyspace_ids)) - q = SimpleStatement( - 'CREATE KEYSPACE {} WITH REPLICATION ='.format(keyspace) + - "{'class': 'SimpleStrategy', 'replication_factor': %s}", - consistency_level=ConsistencyLevel.ALL) - session.execute(q, (rf,)) - session.set_keyspace(keyspace) - return keyspace - - def get_client_relinfo(self, relname): - # We only need one unit, even if rf > 1 - s = self.deployment.sentry['cassandra'][0] - relinfo = s.relation(relname, 'client:{}'.format(relname)) - return relinfo - - def is_port_open(self, port): - status = self.juju_status() - detail = list(status['services']['cassandra']['units'].values())[0] - address = detail['public-address'] - rc = subprocess.call(['nc', '-z', '-w', '2', address, str(port)]) - return rc == 0 - - def reconfigure_cassandra(self, **overrides): - config = dict() - config.update(self.common_config) - config.update(self.test_config) - config.update(overrides) - self.deployment.configure('cassandra', config) - self.deployment.wait() - - -class Test1UnitDeployment(TestDeploymentBase): - """Tests run on both a single node cluster and a 3 node cluster.""" - rf = 1 - test_config = dict(jre='openjdk') - - def test_basics_unit_superuser(self): - # Basic tests using unit superuser credentials - session = self.session() - self.new_keyspace(session) - self._test_database_basics(session) - - def test_basics_client_relation(self): - # Create a keyspace using superuser credentials - super_session = self.session() - keyspace = self.new_keyspace(super_session) - - # Basic tests using standard client relation credentials. - session = self.client_session('database') - session.set_keyspace(keyspace) - self._test_database_basics(session) - - def test_basics_client_admin_relation(self): - # Basic tests using administrative client relation credentials. - session = self.client_session('database-admin') - self.new_keyspace(session) - self._test_database_basics(session) - - def _test_database_basics(self, session): - session.execute('CREATE TABLE Foo (x varchar PRIMARY KEY)') - - # Insert some data, ensuring that it has been stored on - # all of our juju units. Note that the replication factor - # of our keyspace has been set to the number of units we - # deployed. Because it might take a while for the cluster to get - # its act together, we retry this in a loop with a timeout. - timeout = time.time() + 120 - while True: - value = 'hello {}'.format(time.time()) - query = SimpleStatement( - "INSERT INTO Foo (x) VALUES (%s)", - consistency_level=ConsistencyLevel.ALL) - try: - session.execute(query, (value,)) - break - except Exception: - if time.time() > timeout: - raise - - # We can get the data out again. This isn't testing our charm, - # but nice to know anyway... - r = session.execute('SELECT * FROM Foo LIMIT 1') - self.assertTrue(r[0].x.startswith('hello')) - - def test_external_mount(self): - # Not only does this test migrating data from local disk to an - # external mount, it also exercises the rolling restart logic. - # If rf==1, the restart will happen in the - # storage-relation-changed hook as soon as the mount is ready. - # If rf > 1, the restart will happen in the - # cluster-relation-changed hook once the unit has determined - # that it is its turn to restart. - - # First, create a keyspace pre-migration so we can confirm the - # data was migrated rather than being reset to an empty system. - session = self.session() - keyspace = self.new_keyspace(session) - session.execute('CREATE TABLE dat (x varchar PRIMARY KEY)') - total = self.rf * 50 - q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)') - for _ in range(0, total): - session.execute(q, (str(uuid.uuid1()),)) - session.shutdown() - - self.deployment.relate('cassandra:data', 'storage:data') - self.deployment.wait() - # Per Bug #1254766 and Bug #1254766, the sentry.wait() above - # will return before the hooks have actually finished running - # and data migrated. Instead, keep checking until our condition - # is met, or a timeout reached. - timeout = time.time() + 300 - for s in self.deployment.sentry['cassandra']: - unit = s.info['unit_name'] - unit_num = s.info['unit'] - with self.subTest(unit=unit): - while True: - # Attempting to diagnose Amulet failures. I suspect - # SSH host keys again, per Bug #802117 - try: - s.directory_contents('/') - except (subprocess.CalledProcessError, OSError): - self.skipTest('sentry[{!r}].directory_contents({!r}) ' - 'failed!'.format(unit, '/')) - parents = ['/srv', '/srv/cassandra_{}'.format(unit_num), - '/srv/cassandra_{}/cassandra'.format(unit_num)] - for path in parents: - try: - s.directory_contents('/srv') - except (subprocess.CalledProcessError, OSError): - raise AssertionError('Failed to scan {!r} on {}' - .format(path, unit)) - try: - contents = s.directory_contents( - '/srv/cassandra_{}/cassandra/data'.format( - unit_num)) - found = set(contents['directories']) - self.assertIn(keyspace, found) - self.assertIn('system', found) - break - except Exception: - if time.time() > timeout: - raise - time.sleep(5) - - # Confirm no data has been lost, which could happen if we badly - # shutdown and memtables were not flushed. - session = self.session() - session.set_keyspace(keyspace) - q = SimpleStatement('SELECT COUNT(*) FROM dat', - consistency_level=ConsistencyLevel.QUORUM) - results = session.execute(q) - self.assertEqual(results[0][0], total) - - def test_cluster_ports_closed(self): - # The internal Cassandra ports are protected by ufw firewall - # rules, and are closed to everyone except for peers and the - # force_seed_nodes list. This is required for security, since - # the protocols are unauthenticated. It also stops rogue nodes - # on failed units from rejoining the cluster and causing chaos. - self.assertFalse(self.is_port_open(7000), 'Storage port open') - self.assertFalse(self.is_port_open(7001), 'SSL Storage port open') - self.assertFalse(self.is_port_open(7199), 'JMX port open') - - def test_client_ports_open(self): - self.assertTrue(self.is_port_open(9042), 'Native trans port closed') - self.assertTrue(self.is_port_open(9160), 'Thrift RPC port closed') - - def test_default_superuser_account_closed(self): - cluster = self.cluster(username='cassandra', password='cassandra') - try: - cluster.connect() - self.fail('Default credentials not reset') - except NoHostAvailable as x: - for fail in x.errors.values(): - self.assertIsInstance(fail, AuthenticationFailed) - - def test_cqlsh(self): - unit = self.deployment.sentry['cassandra'][0].info['unit_name'] - subprocess.check_output(['juju', 'ssh', unit, - 'sudo -H cqlsh -e exit'], - stderr=subprocess.STDOUT) - - def test_z_add_and_drop_node(self): # 'z' to run this test last. - # We need to be able to add a node correctly into the ring, - # without an operator needing to repair keyspaces to ensure data - # is located on the expected nodes. - # To test this, first create a keyspace with rf==1 and put enough - # data in it so each node will have some. - cluster = self.cluster() - s = cluster.connect() - keyspace = self.new_keyspace(s, rf=1) - q = SimpleStatement('CREATE TABLE dat (x varchar PRIMARY KEY)', - consistency_level=ConsistencyLevel.ALL) - s.execute(q) - - total = self.rf * 50 - q = SimpleStatement('INSERT INTO dat (x) VALUES (%s)', - consistency_level=ConsistencyLevel.QUORUM) - for _ in range(0, total): - s.execute(q, (str(uuid.uuid1()),)) - cluster.shutdown() - - def count(expected): - until = time.time() + 180 - while True: - cluster = self.cluster() - try: - s = cluster.connect(keyspace) - results = s.execute(SimpleStatement( - 'SELECT count(*) FROM dat', - consistency_level=ConsistencyLevel.QUORUM)) - found = results[0][0] - if found == expected or time.time() > until: - return found - time.sleep(0.2) - except Unavailable: - if time.time() > until: - raise - finally: - cluster.shutdown() - - self.assertEqual(count(total), total) - - self.deployment.add_unit('cassandra') - self.deployment.wait() - status = self.juju_status() - unit = sorted(status['services']['cassandra']['units'].keys())[-1] - try: - self.assertEqual(count(total), total) - finally: - # When a node is dropped, it needs to decommission itself and - # move its data to the remaining nodes so no data is lost. - # Alas, per Bug #1417874 we can't yet do this with Juju. - # First, the node must be manually decommissioned before we - # remove the unit. - self._decommission(unit) - self.deployment.remove_unit(unit) - self.deployment.wait() - - self.assertEqual(count(total), total) - - def _decommission(self, unit): - until = time.time() + WAIT_TIMEOUT - while True: - try: - subprocess.check_output(['juju', 'run', '--unit', unit, - 'nodetool decommission'], - stderr=subprocess.STDOUT, - universal_newlines=True) - break - except subprocess.CalledProcessError: - if time.time() > until: - raise - - until = time.time() + WAIT_TIMEOUT - while True: - try: - cmd = ['juju', 'run', '--unit', unit, 'nodetool netstats'] - raw = subprocess.check_output(cmd, stderr=subprocess.STDOUT, - universal_newlines=True) - if 'Mode: DECOMMISSIONED' in raw: - return - if time.time() > until: - raise subprocess.TimeoutExpired(cmd, WAIT_TIMEOUT, raw) - except subprocess.CalledProcessError: - if time.time() > until: - raise - time.sleep(3) - - -class Test3UnitDeployment(Test1UnitDeployment): - """Tests run on a three node cluster.""" - rf = 3 - - -_jre_url = None - - -def _serve(cwd, host, port): - sys.stderr = open('/dev/null', 'w') - os.chdir(cwd) - httpd = http.server.HTTPServer((host, port), - http.server.SimpleHTTPRequestHandler) - httpd.serve_forever() - - -_procs = [] - - -def get_jre_url(): - '''Return the URL to the Oracle Java SE 8 Server Runtime tarball, or None. - - The tarball needs to be placed in ../lib. - - Spawns a web server as a subprocess to serve the file. - ''' - global _jre_url - if _jre_url is not None: - return _jre_url - - jre_dir = os.path.join(ROOT, 'lib') - - jre_tarballs = sorted(glob.glob(os.path.join(jre_dir, - 'server-jre-?u*.tar.gz'))) - if not jre_tarballs: - return None - - # Get the local IP address, only available via hackish means and - # quite possibly incorrect. - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('www.canonical.com', 80)) - host = s.getsockname()[0] - s.close() - - # Get a free port. - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.bind((host, 0)) - port = s.getsockname()[1] - s.close() - - p = multiprocessing.Process(target=_serve, args=(jre_dir, host, port), - daemon=True) - p.start() - _procs.append(p) - - _jre_url = 'http://{}:{}/{}'.format(host, port, - os.path.basename(jre_tarballs[-1])) - return _jre_url - - -class TestOracleJREDeployment(Test1UnitDeployment): - """Basic test with the Oracle JRE. - - Unfortunately these tests cannot be run by the automatic test runners, - as the Oracle JRE is protected from public download by Oracle's - click-through license agreement. - """ - rf = 1 - test_config = dict(jre='Oracle', edition='community', - private_jre_url=get_jre_url()) - - @classmethod - @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available') - def setUpClass(cls): - super(TestOracleJREDeployment, cls).setUpClass() - - -class TestDSEDeployment(Test1UnitDeployment): - """Tests run a single node DataStax Enterprise cluster. - - Unfortunately these tests cannot be run by the automatic test - runners, as the DSE packages are not available for public download. - """ - rf = 1 - test_config = dict( - edition='DSE', # Forces Oracle JRE - install_sources=yaml.safe_dump([os.environ.get('DSE_SOURCE'), - 'ppa:stub/cassandra']), - install_keys=yaml.safe_dump([None, None]), - private_jre_url=get_jre_url()) - - @classmethod - @unittest.skipUnless(get_jre_url(), 'No Oracle JRE tarballs available') - @unittest.skipIf('DSE_SOURCE' not in os.environ, - 'DSE_SOURCE environment variable not configured') - def setUpClass(cls): - super(TestDSEDeployment, cls).setUpClass() - - -class TestAllowAllAuthenticatorDeployment(Test3UnitDeployment): - test_config = dict(authenticator='AllowAllAuthenticator') - - def cluster(self, username=None, password=None, hosts=None, port=9042): - '''A cluster using invalid credentials.''' - return super(TestAllowAllAuthenticatorDeployment, - self).cluster(username='wat', password='eva') - - def client_session(self, relname): - '''A session using invalid credentials.''' - relinfo = self.get_client_relinfo(relname) - self.assertIn('host', relinfo.keys()) - cluster = self.cluster('random', 'nonsense', - [relinfo['host']], - int(relinfo['native_transport_port'])) - session = cluster.connect() - self.addCleanup(session.shutdown) - return session - - test_default_superuser_account_closed = None - - -class Test20Deployment(Test1UnitDeployment): - """Tests run on a single node Apache Cassandra 2.0 cluster. - """ - rf = 1 - test_config = dict( - edition='community', - install_sources=yaml.safe_dump([ - 'ppa:stub/cassandra', - 'ppa:openjdk-r/ppa', - 'deb http://www.apache.org/dist/cassandra/debian 20x main']), - install_keys=yaml.safe_dump([None, None, None])) - - -class Test21Deployment(Test1UnitDeployment): - """Tests run on a single node Apache Cassandra 2.1 cluster. - """ - rf = 1 - test_config = dict( - edition='community', - install_sources=yaml.safe_dump([ - 'ppa:stub/cassandra', - 'ppa:openjdk-r/ppa', - 'deb http://www.apache.org/dist/cassandra/debian 21x main']), - install_keys=yaml.safe_dump([None, None, None])) - - -class Test30Deployment(Test1UnitDeployment): - """Tests run on a single node Apache Cassandra 3.0 cluster. - """ - rf = 1 - test_config = dict( - edition='community', - install_sources=yaml.safe_dump([ - 'ppa:stub/cassandra', - 'ppa:openjdk-r/ppa', - 'deb http://www.apache.org/dist/cassandra/debian 30x main']), - install_keys=yaml.safe_dump([None, None, None])) - - -# Bug #1417097 means we need to monkey patch Amulet for now. -real_juju = amulet.helpers.juju - - -@wraps(real_juju) -def patched_juju(args, env=None): - args = [str(a) for a in args] - return real_juju(args, env) - -amulet.helpers.juju = patched_juju -amulet.deployer.juju = patched_juju - - -if __name__ == '__main__': - unittest.main(verbosity=2) |