From 4faa7f927149a5c4ef7a03523f7bc14523cb9baa Mon Sep 17 00:00:00 2001 From: Stuart Mackie Date: Fri, 7 Oct 2016 12:24:58 -0700 Subject: Charms for Contrail 3.1 with Mitaka Change-Id: Id37f3b9743d1974e31fcd7cd9c54be41bb0c47fb Signed-off-by: Stuart Mackie --- charms/trusty/cassandra/hooks/helpers.py | 1084 ++++++++++++++++++++++++++++++ 1 file changed, 1084 insertions(+) create mode 100644 charms/trusty/cassandra/hooks/helpers.py (limited to 'charms/trusty/cassandra/hooks/helpers.py') diff --git a/charms/trusty/cassandra/hooks/helpers.py b/charms/trusty/cassandra/hooks/helpers.py new file mode 100644 index 0000000..b86a6b1 --- /dev/null +++ b/charms/trusty/cassandra/hooks/helpers.py @@ -0,0 +1,1084 @@ +# Copyright 2015 Canonical Ltd. +# +# This file is part of the Cassandra Charm for Juju. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranties of +# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import configparser +from contextlib import contextmanager +from datetime import timedelta +from distutils.version import LooseVersion +import errno +from functools import wraps +import io +import json +import os.path +import re +import shutil +import subprocess +import sys +import tempfile +from textwrap import dedent +import time + +import bcrypt +from cassandra import ConsistencyLevel +import cassandra.auth +import cassandra.cluster +import cassandra.query +import yaml + +from charmhelpers.core import hookenv, host +from charmhelpers.core.hookenv import DEBUG, ERROR, WARNING +from charmhelpers import fetch + +from coordinator import coordinator + + +RESTART_TIMEOUT = 600 + + +def logged(func): + @wraps(func) + def wrapper(*args, **kw): + hookenv.log("* Helper {}/{}".format(hookenv.hook_name(), + func.__name__)) + return func(*args, **kw) + return wrapper + + +def backoff(what_for, max_pause=60): + i = 0 + while True: + yield True + i += 1 + pause = min(max_pause, 2 ** i) + time.sleep(pause) + if pause > 10: + hookenv.log('Recheck {} for {}'.format(i, what_for)) + + +# FOR CHARMHELPERS +@contextmanager +def autostart_disabled(services=None, _policy_rc='/usr/sbin/policy-rc.d'): + '''Tell well behaved Debian packages to not start services when installed. + ''' + script = ['#!/bin/sh'] + if services is not None: + for service in services: + script.append( + 'if [ "$1" = "{}" ]; then exit 101; fi'.format(service)) + script.append('exit 0') + else: + script.append('exit 101') # By default, all daemons disabled. + try: + if os.path.exists(_policy_rc): + shutil.move(_policy_rc, "{}-orig".format(_policy_rc)) + host.write_file(_policy_rc, '\n'.join(script).encode('ASCII'), + perms=0o555) + yield + finally: + os.unlink(_policy_rc) + if os.path.exists("{}-orig".format(_policy_rc)): + shutil.move("{}-orig".format(_policy_rc), _policy_rc) + + +# FOR CHARMHELPERS +@logged +def install_packages(packages): + packages = list(packages) + if hookenv.config('extra_packages'): + packages.extend(hookenv.config('extra_packages').split()) + packages = fetch.filter_installed_packages(packages) + if packages: + # The DSE packages are huge, so this might take some time. + status_set('maintenance', 'Installing packages') + with autostart_disabled(['cassandra']): + fetch.apt_install(packages, fatal=True) + + +# FOR CHARMHELPERS +@logged +def ensure_package_status(packages): + config_dict = hookenv.config() + + package_status = config_dict['package_status'] + + if package_status not in ['install', 'hold']: + raise RuntimeError("package_status must be 'install' or 'hold', " + "not {!r}".format(package_status)) + + selections = [] + for package in packages: + selections.append('{} {}\n'.format(package, package_status)) + dpkg = subprocess.Popen(['dpkg', '--set-selections'], + stdin=subprocess.PIPE) + dpkg.communicate(input=''.join(selections).encode('US-ASCII')) + + +def get_seed_ips(): + '''Return the set of seed ip addresses. + + We use ip addresses rather than unit names, as we may need to use + external seed ips at some point. + ''' + return set((hookenv.leader_get('seeds') or '').split(',')) + + +def actual_seed_ips(): + '''Return the seeds currently in cassandra.yaml''' + cassandra_yaml = read_cassandra_yaml() + s = cassandra_yaml['seed_provider'][0]['parameters'][0]['seeds'] + return set(s.split(',')) + + +def get_database_directory(config_path): + '''Convert a database path from the service config to an absolute path. + + Entries in the config file may be absolute, relative to + /var/lib/cassandra, or relative to the mountpoint. + ''' + import relations + storage = relations.StorageRelation() + if storage.mountpoint: + root = os.path.join(storage.mountpoint, 'cassandra') + else: + root = '/var/lib/cassandra' + return os.path.join(root, config_path) + + +def ensure_database_directory(config_path): + '''Create the database directory if it doesn't exist, resetting + ownership and other settings while we are at it. + + Returns the absolute path. + ''' + absdir = get_database_directory(config_path) + + # Work around Bug #1427150 by ensuring components of the path are + # created with the required permissions, if necessary. + component = os.sep + for p in absdir.split(os.sep)[1:-1]: + component = os.path.join(component, p) + if not os.path.exists(p): + host.mkdir(component) + assert component == os.path.split(absdir)[0] + host.mkdir(absdir, owner='cassandra', group='cassandra', perms=0o750) + return absdir + + +def get_all_database_directories(): + config = hookenv.config() + dirs = dict( + data_file_directories=[get_database_directory(d) + for d in (config['data_file_directories'] or + 'data').split()], + commitlog_directory=get_database_directory( + config['commitlog_directory'] or 'commitlog'), + saved_caches_directory=get_database_directory( + config['saved_caches_directory'] or 'saved_caches')) + if has_cassandra_version('3.0'): + # Not yet configurable. Make configurable with Juju native storage. + dirs['hints_directory'] = get_database_directory('hints') + return dirs + + +def mountpoint(path): + '''Return the mountpoint that path exists on.''' + path = os.path.realpath(path) + while path != '/' and not os.path.ismount(path): + path = os.path.dirname(path) + return path + + +# FOR CHARMHELPERS +def is_lxc(): + '''Return True if we are running inside an LXC container.''' + with open('/proc/1/cgroup', 'r') as f: + return ':/lxc/' in f.readline() + + +# FOR CHARMHELPERS +def set_io_scheduler(io_scheduler, directory): + '''Set the block device io scheduler.''' + + assert os.path.isdir(directory) + + # The block device regex may be a tad simplistic. + block_regex = re.compile('\/dev\/([a-z]*)', re.IGNORECASE) + + output = subprocess.check_output(['df', directory], + universal_newlines=True) + + if not is_lxc(): + hookenv.log("Setting block device of {} to IO scheduler {}" + "".format(directory, io_scheduler)) + try: + block_dev = re.findall(block_regex, output)[0] + except IndexError: + hookenv.log("Unable to locate block device of {} (in container?)" + "".format(directory)) + return + sys_file = os.path.join("/", "sys", "block", block_dev, + "queue", "scheduler") + try: + host.write_file(sys_file, io_scheduler.encode('ascii'), + perms=0o644) + except OSError as e: + if e.errno == errno.EACCES: + hookenv.log("Got Permission Denied trying to set the " + "IO scheduler at {}. We may be in an LXC. " + "Exiting gracefully".format(sys_file), + WARNING) + elif e.errno == errno.ENOENT: + hookenv.log("Got no such file or directory trying to " + "set the IO scheduler at {}. It may be " + "this is an LXC, the device name is as " + "yet unknown to the charm, or LVM/RAID is " + "hiding the underlying device name. " + "Exiting gracefully".format(sys_file), + WARNING) + else: + raise e + else: + # Make no change if we are in an LXC + hookenv.log("In an LXC. Cannot set io scheduler {}" + "".format(io_scheduler)) + + +# FOR CHARMHELPERS +def recursive_chown(directory, owner="root", group="root"): + '''Change ownership of all files and directories in 'directory'. + + Ownership of 'directory' is also reset. + ''' + shutil.chown(directory, owner, group) + for root, dirs, files in os.walk(directory): + for dirname in dirs: + shutil.chown(os.path.join(root, dirname), owner, group) + for filename in files: + shutil.chown(os.path.join(root, filename), owner, group) + + +def maybe_backup(path): + '''Copy a file to file.orig, if file.orig does not already exist.''' + backup_path = path + '.orig' + if not os.path.exists(backup_path): + with open(path, 'rb') as f: + host.write_file(backup_path, f.read(), perms=0o600) + + +# FOR CHARMHELPERS +def get_package_version(package): + cache = fetch.apt_cache() + if package not in cache: + return None + pkgver = cache[package].current_ver + if pkgver is not None: + return pkgver.ver_str + return None + + +def get_jre(): + # DataStax Enterprise requires the Oracle JRE. + if get_cassandra_edition() == 'dse': + return 'oracle' + + config = hookenv.config() + jre = config['jre'].lower() + if jre not in ('openjdk', 'oracle'): + hookenv.log('Unknown JRE {!r} specified. Using OpenJDK'.format(jre), + ERROR) + jre = 'openjdk' + return jre + + +def get_cassandra_edition(): + config = hookenv.config() + edition = config['edition'].lower() + if edition not in ('community', 'dse'): + hookenv.log('Unknown edition {!r}. Using community.'.format(edition), + ERROR) + edition = 'community' + return edition + + +def get_cassandra_service(): + '''Cassandra upstart service''' + if get_cassandra_edition() == 'dse': + return 'dse' + return 'cassandra' + + +def get_cassandra_version(): + if get_cassandra_edition() == 'dse': + dse_ver = get_package_version('dse-full') + if not dse_ver: + return None + elif LooseVersion(dse_ver) >= LooseVersion('5.0'): + return '3.0' + elif LooseVersion(dse_ver) >= LooseVersion('4.7'): + return '2.1' + else: + return '2.0' + return get_package_version('cassandra') + + +def has_cassandra_version(minimum_ver): + cassandra_version = get_cassandra_version() + assert cassandra_version is not None, 'Cassandra package not yet installed' + return LooseVersion(cassandra_version) >= LooseVersion(minimum_ver) + + +def get_cassandra_config_dir(): + if get_cassandra_edition() == 'dse': + return '/etc/dse/cassandra' + else: + return '/etc/cassandra' + + +def get_cassandra_yaml_file(): + return os.path.join(get_cassandra_config_dir(), "cassandra.yaml") + + +def get_cassandra_env_file(): + return os.path.join(get_cassandra_config_dir(), "cassandra-env.sh") + + +def get_cassandra_rackdc_file(): + return os.path.join(get_cassandra_config_dir(), + "cassandra-rackdc.properties") + + +def get_cassandra_pid_file(): + edition = get_cassandra_edition() + if edition == 'dse': + pid_file = "/var/run/dse/dse.pid" + else: + pid_file = "/var/run/cassandra/cassandra.pid" + return pid_file + + +def get_cassandra_packages(): + edition = get_cassandra_edition() + if edition == 'dse': + packages = set(['dse-full']) + else: + packages = set(['cassandra']) # 'cassandra-tools' + + packages.add('ntp') + packages.add('run-one') + packages.add('netcat') + + jre = get_jre() + if jre == 'oracle': + # We can't use a packaged version of the Oracle JRE, as we + # are not allowed to bypass Oracle's click through license + # agreement. + pass + else: + # NB. OpenJDK 8 not available in trusty. This needs to come + # from a PPA or some other configured source. + packages.add('openjdk-8-jre-headless') + + return packages + + +@logged +def stop_cassandra(): + if is_cassandra_running(): + hookenv.log('Shutting down Cassandra') + host.service_stop(get_cassandra_service()) + if is_cassandra_running(): + hookenv.status_set('blocked', 'Cassandra failed to shut down') + raise SystemExit(0) + + +@logged +def start_cassandra(): + if is_cassandra_running(): + return + + actual_seeds = sorted(actual_seed_ips()) + assert actual_seeds, 'Attempting to start cassandra with empty seed list' + hookenv.config()['configured_seeds'] = actual_seeds + + if is_bootstrapped(): + status_set('maintenance', + 'Starting Cassandra with seeds {!r}' + .format(','.join(actual_seeds))) + else: + status_set('maintenance', + 'Bootstrapping with seeds {}' + .format(','.join(actual_seeds))) + + host.service_start(get_cassandra_service()) + + # Wait for Cassandra to actually start, or abort. + timeout = time.time() + RESTART_TIMEOUT + while time.time() < timeout: + if is_cassandra_running(): + return + time.sleep(1) + status_set('blocked', 'Cassandra failed to start') + raise SystemExit(0) + + +@logged +def reconfigure_and_restart_cassandra(overrides={}): + stop_cassandra() + configure_cassandra_yaml(overrides) + start_cassandra() + + +@logged +def remount_cassandra(): + '''If a new mountpoint is ready, migrate data across to it.''' + assert not is_cassandra_running() # Guard against data loss. + import relations + storage = relations.StorageRelation() + if storage.needs_remount(): + status_set('maintenance', 'Migrating data to new mountpoint') + hookenv.config()['bootstrapped_into_cluster'] = False + if storage.mountpoint is None: + hookenv.log('External storage AND DATA gone. ' + 'Reverting to local storage. ' + 'In danger of resurrecting old data. ', + WARNING) + else: + storage.migrate('/var/lib/cassandra', 'cassandra') + root = os.path.join(storage.mountpoint, 'cassandra') + os.chmod(root, 0o750) + + +@logged +def ensure_database_directories(): + '''Ensure that directories Cassandra expects to store its data in exist.''' + # Guard against changing perms on a running db. Although probably + # harmless, it causes shutil.chown() to fail. + assert not is_cassandra_running() + db_dirs = get_all_database_directories() + ensure_database_directory(db_dirs['commitlog_directory']) + ensure_database_directory(db_dirs['saved_caches_directory']) + if 'hints_directory' in db_dirs: + ensure_database_directory(db_dirs['hints_directory']) + for db_dir in db_dirs['data_file_directories']: + ensure_database_directory(db_dir) + + +CONNECT_TIMEOUT = 10 + + +@contextmanager +def connect(username=None, password=None, timeout=CONNECT_TIMEOUT, + auth_timeout=CONNECT_TIMEOUT): + # We pull the currently configured listen address and port from the + # yaml, rather than the service configuration, as it may have been + # overridden. + cassandra_yaml = read_cassandra_yaml() + address = cassandra_yaml['rpc_address'] + if address == '0.0.0.0': + address = 'localhost' + port = cassandra_yaml['native_transport_port'] + + if username is None or password is None: + username, password = superuser_credentials() + + auth = hookenv.config()['authenticator'] + if auth == 'AllowAllAuthenticator': + auth_provider = None + else: + auth_provider = cassandra.auth.PlainTextAuthProvider(username=username, + password=password) + + # Although we specify a reconnection_policy, it does not apply to + # the initial connection so we retry in a loop. + start = time.time() + until = start + timeout + auth_until = start + auth_timeout + while True: + cluster = cassandra.cluster.Cluster([address], port=port, + auth_provider=auth_provider) + try: + session = cluster.connect() + session.default_timeout = timeout + break + except cassandra.cluster.NoHostAvailable as x: + cluster.shutdown() + now = time.time() + # If every node failed auth, reraise one of the + # AuthenticationFailed exceptions. Unwrapping the exception + # means call sites don't have to sniff the exception bundle. + # We don't retry on auth fails; this method should not be + # called if the system_auth data is inconsistent. + auth_fails = [af for af in x.errors.values() + if isinstance(af, cassandra.AuthenticationFailed)] + if auth_fails: + if now > auth_until: + raise auth_fails[0] + if now > until: + raise + time.sleep(1) + try: + yield session + finally: + cluster.shutdown() + + +QUERY_TIMEOUT = 60 + + +def query(session, statement, consistency_level, args=None): + q = cassandra.query.SimpleStatement(statement, + consistency_level=consistency_level) + + until = time.time() + QUERY_TIMEOUT + for _ in backoff('query to execute'): + try: + return session.execute(q, args) + except Exception: + if time.time() > until: + raise + + +def encrypt_password(password): + return bcrypt.hashpw(password, bcrypt.gensalt()) + + +@logged +def ensure_user(session, username, encrypted_password, superuser=False): + '''Create the DB user if it doesn't already exist & reset the password.''' + auth = hookenv.config()['authenticator'] + if auth == 'AllowAllAuthenticator': + return # No authentication means we cannot create users + + if superuser: + hookenv.log('Creating SUPERUSER {}'.format(username)) + else: + hookenv.log('Creating user {}'.format(username)) + if has_cassandra_version('2.2'): + query(session, + 'INSERT INTO system_auth.roles ' + '(role, can_login, is_superuser, salted_hash) ' + 'VALUES (%s, TRUE, %s, %s)', + ConsistencyLevel.ALL, + (username, superuser, encrypted_password)) + else: + query(session, + 'INSERT INTO system_auth.users (name, super) VALUES (%s, %s)', + ConsistencyLevel.ALL, (username, superuser)) + query(session, + 'INSERT INTO system_auth.credentials (username, salted_hash) ' + 'VALUES (%s, %s)', + ConsistencyLevel.ALL, (username, encrypted_password)) + + +@logged +def create_unit_superuser_hard(): + '''Create or recreate the unit's superuser account. + + This method is used when there are no known superuser credentials + to use. We restart the node using the AllowAllAuthenticator and + insert our credentials directly into the system_auth keyspace. + ''' + username, password = superuser_credentials() + pwhash = encrypt_password(password) + hookenv.log('Creating unit superuser {}'.format(username)) + + # Restart cassandra without authentication & listening on localhost. + reconfigure_and_restart_cassandra( + dict(authenticator='AllowAllAuthenticator', rpc_address='localhost')) + for _ in backoff('superuser creation'): + try: + with connect() as session: + ensure_user(session, username, pwhash, superuser=True) + break + except Exception as x: + print(str(x)) + + # Restart Cassandra with regular config. + nodetool('flush') # Ensure our backdoor updates are flushed. + reconfigure_and_restart_cassandra() + + +def get_cqlshrc_path(): + return os.path.expanduser('~root/.cassandra/cqlshrc') + + +def superuser_username(): + return 'juju_{}'.format(re.subn(r'\W', '_', hookenv.local_unit())[0]) + + +def superuser_credentials(): + '''Return (username, password) to connect to the Cassandra superuser. + + The credentials are persisted in the root user's cqlshrc file, + making them easily accessible to the command line tools. + ''' + cqlshrc_path = get_cqlshrc_path() + cqlshrc = configparser.ConfigParser(interpolation=None) + cqlshrc.read([cqlshrc_path]) + + username = superuser_username() + + try: + section = cqlshrc['authentication'] + # If there happened to be an existing cqlshrc file, it might + # contain invalid credentials. Ignore them. + if section['username'] == username: + return section['username'], section['password'] + except KeyError: + hookenv.log('Generating superuser credentials into {}'.format( + cqlshrc_path)) + + config = hookenv.config() + + password = host.pwgen() + + hookenv.log('Generated username {}'.format(username)) + + # We set items separately, rather than together, so that we have a + # defined order for the ConfigParser to preserve and the tests to + # rely on. + cqlshrc.setdefault('authentication', {}) + cqlshrc['authentication']['username'] = username + cqlshrc['authentication']['password'] = password + cqlshrc.setdefault('connection', {}) + cqlshrc['connection']['hostname'] = hookenv.unit_public_ip() + if get_cassandra_version().startswith('2.0'): + cqlshrc['connection']['port'] = str(config['rpc_port']) + else: + cqlshrc['connection']['port'] = str(config['native_transport_port']) + + ini = io.StringIO() + cqlshrc.write(ini) + host.mkdir(os.path.dirname(cqlshrc_path), perms=0o700) + host.write_file(cqlshrc_path, ini.getvalue().encode('UTF-8'), perms=0o400) + + return username, password + + +def emit(*args, **kw): + # Just like print, but with plumbing and mocked out in the test suite. + print(*args, **kw) + sys.stdout.flush() + + +def nodetool(*cmd, timeout=120): + cmd = ['nodetool'] + [str(i) for i in cmd] + i = 0 + until = time.time() + timeout + for _ in backoff('nodetool to work'): + i += 1 + try: + if timeout is not None: + timeout = max(0, until - time.time()) + raw = subprocess.check_output(cmd, universal_newlines=True, + timeout=timeout, + stderr=subprocess.STDOUT) + + # Work around CASSANDRA-8776. + if 'status' in cmd and 'Error:' in raw: + hookenv.log('Error detected but nodetool returned success.', + WARNING) + raise subprocess.CalledProcessError(99, cmd, raw) + + hookenv.log('{} succeeded'.format(' '.join(cmd)), DEBUG) + out = raw.expandtabs() + emit(out) + return out + + except subprocess.CalledProcessError as x: + if i > 1: + emit(x.output.expandtabs()) # Expand tabs for juju debug-log. + if not is_cassandra_running(): + status_set('blocked', + 'Cassandra has unexpectedly shutdown') + raise SystemExit(0) + if time.time() >= until: + raise + + +def num_nodes(): + return len(get_bootstrapped_ips()) + + +def read_cassandra_yaml(): + cassandra_yaml_path = get_cassandra_yaml_file() + with open(cassandra_yaml_path, 'rb') as f: + return yaml.safe_load(f) + + +@logged +def write_cassandra_yaml(cassandra_yaml): + cassandra_yaml_path = get_cassandra_yaml_file() + host.write_file(cassandra_yaml_path, + yaml.safe_dump(cassandra_yaml).encode('UTF-8')) + + +def configure_cassandra_yaml(overrides={}, seeds=None): + cassandra_yaml_path = get_cassandra_yaml_file() + config = hookenv.config() + + maybe_backup(cassandra_yaml_path) # Its comments may be useful. + + cassandra_yaml = read_cassandra_yaml() + + # Most options just copy from config.yaml keys with the same name. + # Using the same name is preferred to match the actual Cassandra + # documentation. + simple_config_keys = ['cluster_name', 'num_tokens', + 'partitioner', 'authorizer', 'authenticator', + 'compaction_throughput_mb_per_sec', + 'stream_throughput_outbound_megabits_per_sec', + 'tombstone_warn_threshold', + 'tombstone_failure_threshold', + 'native_transport_port', 'rpc_port', + 'storage_port', 'ssl_storage_port'] + cassandra_yaml.update((k, config[k]) for k in simple_config_keys) + + seeds = ','.join(seeds or get_seed_ips()) # Don't include whitespace! + hookenv.log('Configuring seeds as {!r}'.format(seeds), DEBUG) + cassandra_yaml['seed_provider'][0]['parameters'][0]['seeds'] = seeds + + cassandra_yaml['listen_address'] = hookenv.unit_private_ip() + cassandra_yaml['rpc_address'] = '0.0.0.0' + if not get_cassandra_version().startswith('2.0'): + cassandra_yaml['broadcast_rpc_address'] = hookenv.unit_public_ip() + + dirs = get_all_database_directories() + cassandra_yaml.update(dirs) + + # GossipingPropertyFileSnitch is the only snitch recommended for + # production. It we allow others, we need to consider how to deal + # with the system_auth keyspace replication settings. + cassandra_yaml['endpoint_snitch'] = 'GossipingPropertyFileSnitch' + + # Per Bug #1523546 and CASSANDRA-9319, Thrift is disabled by default in + # Cassandra 2.2. Ensure it is enabled if rpc_port is non-zero. + if int(config['rpc_port']) > 0: + cassandra_yaml['start_rpc'] = True + + cassandra_yaml.update(overrides) + + write_cassandra_yaml(cassandra_yaml) + + +def get_pid_from_file(pid_file): + try: + with open(pid_file, 'r') as f: + pid = int(f.read().strip().split()[0]) + if pid <= 1: + raise ValueError('Illegal pid {}'.format(pid)) + return pid + except (ValueError, IndexError) as e: + hookenv.log("Invalid PID in {}.".format(pid_file)) + raise ValueError(e) + + +def is_cassandra_running(): + pid_file = get_cassandra_pid_file() + + try: + for _ in backoff('Cassandra to respond'): + # We reload the pid every time, in case it has gone away. + # If it goes away, a FileNotFound exception is raised. + pid = get_pid_from_file(pid_file) + + # This does not kill the process but checks for its + # existence. It raises an ProcessLookupError if the process + # is not running. + os.kill(pid, 0) + + if subprocess.call(["nodetool", "status"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) == 0: + hookenv.log( + "Cassandra PID {} is running and responding".format(pid)) + return True + except FileNotFoundError: + hookenv.log("Cassandra is not running. PID file does not exist.") + return False + except ProcessLookupError: + if os.path.exists(pid_file): + # File disappeared between reading the PID and checking if + # the PID is running. + hookenv.log("Cassandra is not running, but pid file exists.", + WARNING) + else: + hookenv.log("Cassandra is not running. PID file does not exist.") + return False + + +def get_auth_keyspace_replication(session): + if has_cassandra_version('3.0'): + statement = dedent('''\ + SELECT replication FROM system_schema.keyspaces + WHERE keyspace_name='system_auth' + ''') + r = query(session, statement, ConsistencyLevel.QUORUM) + return dict(r[0][0]) + else: + statement = dedent('''\ + SELECT strategy_options FROM system.schema_keyspaces + WHERE keyspace_name='system_auth' + ''') + r = query(session, statement, ConsistencyLevel.QUORUM) + return json.loads(r[0][0]) + + +@logged +def set_auth_keyspace_replication(session, settings): + # Live operation, so keep status the same. + status_set(hookenv.status_get()[0], + 'Updating system_auth rf to {!r}'.format(settings)) + statement = 'ALTER KEYSPACE system_auth WITH REPLICATION = %s' + query(session, statement, ConsistencyLevel.ALL, (settings,)) + + +@logged +def repair_auth_keyspace(): + # Repair takes a long time, and may need to be retried due to 'snapshot + # creation' errors, but should certainly complete within an hour since + # the keyspace is tiny. + status_set(hookenv.status_get()[0], + 'Repairing system_auth keyspace') + nodetool('repair', 'system_auth', timeout=3600) + + +def is_bootstrapped(unit=None): + '''Return True if the node has already bootstrapped into the cluster.''' + if unit is None or unit == hookenv.local_unit(): + return hookenv.config().get('bootstrapped', False) + elif coordinator.relid: + return bool(hookenv.relation_get(rid=coordinator.relid, + unit=unit).get('bootstrapped')) + else: + return False + + +def set_bootstrapped(): + # We need to store this flag in two locations. The peer relation, + # so peers can see it, and local state, for when we haven't joined + # the peer relation yet. actions.publish_bootstrapped_flag() + # calls this method again when necessary to ensure that state is + # propagated # if/when the peer relation is joined. + config = hookenv.config() + config['bootstrapped'] = True + if coordinator.relid is not None: + hookenv.relation_set(coordinator.relid, bootstrapped="1") + if config.changed('bootstrapped'): + hookenv.log('Bootstrapped') + else: + hookenv.log('Already bootstrapped') + + +def get_bootstrapped(): + units = [hookenv.local_unit()] + if coordinator.relid is not None: + units.extend(hookenv.related_units(coordinator.relid)) + return set([unit for unit in units if is_bootstrapped(unit)]) + + +def get_bootstrapped_ips(): + return set([unit_to_ip(unit) for unit in get_bootstrapped()]) + + +def unit_to_ip(unit): + if unit is None or unit == hookenv.local_unit(): + return hookenv.unit_private_ip() + elif coordinator.relid: + pa = hookenv.relation_get(rid=coordinator.relid, + unit=unit).get('private-address') + return hookenv._ensure_ip(pa) + else: + return None + + +def get_node_status(): + '''Return the Cassandra node status. + + May be NORMAL, JOINING, DECOMMISSIONED etc., or None if we can't tell. + ''' + if not is_cassandra_running(): + return None + raw = nodetool('netstats') + m = re.search(r'(?m)^Mode:\s+(\w+)$', raw) + if m is None: + return None + return m.group(1).upper() + + +def is_decommissioned(): + status = get_node_status() + if status in ('DECOMMISSIONED', 'LEAVING'): + hookenv.log('This node is {}'.format(status), WARNING) + return True + return False + + +@logged +def emit_describe_cluster(): + '''Run nodetool describecluster for the logs.''' + nodetool('describecluster') # Implicit emit + + +@logged +def emit_status(): + '''Run 'nodetool status' for the logs.''' + nodetool('status') # Implicit emit + + +@logged +def emit_netstats(): + '''Run 'nodetool netstats' for the logs.''' + nodetool('netstats') # Implicit emit + + +def emit_cluster_info(): + emit_describe_cluster() + emit_status() + emit_netstats() + + +# FOR CHARMHELPERS (and think of a better name) +def week_spread(unit_num): + '''Pick a time for a unit's weekly job. + + Jobs are spread out evenly throughout the week as best we can. + The chosen time only depends on the unit number, and does not change + if other units are added and removed; while the chosen time will not + be perfect, we don't have to worry about skipping a weekly job if + units are added or removed at the wrong moment. + + Returns (dow, hour, minute) suitable for cron. + ''' + def vdc(n, base=2): + '''Van der Corpet sequence. 0, 0.5, 0.25, 0.75, 0.125, 0.625, ... + + http://rosettacode.org/wiki/Van_der_Corput_sequence#Python + ''' + vdc, denom = 0, 1 + while n: + denom *= base + n, remainder = divmod(n, base) + vdc += remainder / denom + return vdc + # We could use the vdc() function to distribute jobs evenly throughout + # the week, so unit 0==0, unit 1==3.5days, unit 2==1.75 etc. But + # plain modulo for the day of week is easier for humans and what + # you expect for 7 units or less. + sched_dow = unit_num % 7 + # We spread time of day so each batch of 7 units gets the same time, + # as far spread out from the other batches of 7 units as possible. + minutes_in_day = 24 * 60 + sched = timedelta(minutes=int(minutes_in_day * vdc(unit_num // 7))) + sched_hour = sched.seconds // (60 * 60) + sched_minute = sched.seconds // 60 - sched_hour * 60 + return (sched_dow, sched_hour, sched_minute) + + +# FOR CHARMHELPERS. This should be a constant in nrpe.py +def local_plugins_dir(): + return '/usr/local/lib/nagios/plugins' + + +def leader_ping(): + '''Make a change in the leader settings, waking the non-leaders.''' + assert hookenv.is_leader() + last = int(hookenv.leader_get('ping') or 0) + hookenv.leader_set(ping=str(last + 1)) + + +def get_unit_superusers(): + '''Return the set of units that have had their superuser accounts created. + ''' + raw = hookenv.leader_get('superusers') + return set(json.loads(raw or '[]')) + + +def set_unit_superusers(superusers): + hookenv.leader_set(superusers=json.dumps(sorted(superusers))) + + +def status_set(state, message): + '''Set the unit status and log a message.''' + hookenv.status_set(state, message) + hookenv.log('{} unit state: {}'.format(state, message)) + + +def service_status_set(state, message): + '''Set the service status and log a message.''' + subprocess.check_call(['status-set', '--service', state, message]) + hookenv.log('{} service state: {}'.format(state, message)) + + +def get_service_name(relid): + '''Return the service name for the other end of relid.''' + units = hookenv.related_units(relid) + if units: + return units[0].split('/', 1)[0] + else: + return None + + +def peer_relid(): + return coordinator.relid + + +@logged +def set_active(): + '''Set happy state''' + if hookenv.unit_private_ip() in get_seed_ips(): + msg = 'Live seed' + else: + msg = 'Live node' + status_set('active', msg) + + if hookenv.is_leader(): + n = num_nodes() + if n == 1: + n = 'Single' + service_status_set('active', '{} node cluster'.format(n)) + + +def update_hosts_file(hosts_file, hosts_map): + """Older versions of Cassandra need own hostname resolution.""" + with open(hosts_file, 'r') as hosts: + lines = hosts.readlines() + + newlines = [] + for ip, hostname in hosts_map.items(): + if not ip or not hostname: + continue + + keepers = [] + for line in lines: + _line = line.split() + if len(_line) < 2 or not (_line[0] == ip or hostname in _line[1:]): + keepers.append(line) + else: + hookenv.log('Marking line {!r} for update or removal' + ''.format(line.strip()), level=DEBUG) + + lines = keepers + newlines.append('{} {}\n'.format(ip, hostname)) + + lines += newlines + + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + with open(tmpfile.name, 'w') as hosts: + for line in lines: + hosts.write(line) + + os.rename(tmpfile.name, hosts_file) + os.chmod(hosts_file, 0o644) -- cgit 1.2.3-korg