diff options
Diffstat (limited to 'charms/trusty/cassandra/hooks/helpers.py')
-rw-r--r-- | charms/trusty/cassandra/hooks/helpers.py | 1084 |
1 files changed, 0 insertions, 1084 deletions
diff --git a/charms/trusty/cassandra/hooks/helpers.py b/charms/trusty/cassandra/hooks/helpers.py deleted file mode 100644 index b86a6b1..0000000 --- a/charms/trusty/cassandra/hooks/helpers.py +++ /dev/null @@ -1,1084 +0,0 @@ -# Copyright 2015 Canonical Ltd. -# -# This file is part of the Cassandra Charm for Juju. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3, as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranties of -# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR -# PURPOSE. See the GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -import configparser -from contextlib import contextmanager -from datetime import timedelta -from distutils.version import LooseVersion -import errno -from functools import wraps -import io -import json -import os.path -import re -import shutil -import subprocess -import sys -import tempfile -from textwrap import dedent -import time - -import bcrypt -from cassandra import ConsistencyLevel -import cassandra.auth -import cassandra.cluster -import cassandra.query -import yaml - -from charmhelpers.core import hookenv, host -from charmhelpers.core.hookenv import DEBUG, ERROR, WARNING -from charmhelpers import fetch - -from coordinator import coordinator - - -RESTART_TIMEOUT = 600 - - -def logged(func): - @wraps(func) - def wrapper(*args, **kw): - hookenv.log("* Helper {}/{}".format(hookenv.hook_name(), - func.__name__)) - return func(*args, **kw) - return wrapper - - -def backoff(what_for, max_pause=60): - i = 0 - while True: - yield True - i += 1 - pause = min(max_pause, 2 ** i) - time.sleep(pause) - if pause > 10: - hookenv.log('Recheck {} for {}'.format(i, what_for)) - - -# FOR CHARMHELPERS -@contextmanager -def autostart_disabled(services=None, _policy_rc='/usr/sbin/policy-rc.d'): - '''Tell well behaved Debian packages to not start services when installed. - ''' - script = ['#!/bin/sh'] - if services is not None: - for service in services: - script.append( - 'if [ "$1" = "{}" ]; then exit 101; fi'.format(service)) - script.append('exit 0') - else: - script.append('exit 101') # By default, all daemons disabled. - try: - if os.path.exists(_policy_rc): - shutil.move(_policy_rc, "{}-orig".format(_policy_rc)) - host.write_file(_policy_rc, '\n'.join(script).encode('ASCII'), - perms=0o555) - yield - finally: - os.unlink(_policy_rc) - if os.path.exists("{}-orig".format(_policy_rc)): - shutil.move("{}-orig".format(_policy_rc), _policy_rc) - - -# FOR CHARMHELPERS -@logged -def install_packages(packages): - packages = list(packages) - if hookenv.config('extra_packages'): - packages.extend(hookenv.config('extra_packages').split()) - packages = fetch.filter_installed_packages(packages) - if packages: - # The DSE packages are huge, so this might take some time. - status_set('maintenance', 'Installing packages') - with autostart_disabled(['cassandra']): - fetch.apt_install(packages, fatal=True) - - -# FOR CHARMHELPERS -@logged -def ensure_package_status(packages): - config_dict = hookenv.config() - - package_status = config_dict['package_status'] - - if package_status not in ['install', 'hold']: - raise RuntimeError("package_status must be 'install' or 'hold', " - "not {!r}".format(package_status)) - - selections = [] - for package in packages: - selections.append('{} {}\n'.format(package, package_status)) - dpkg = subprocess.Popen(['dpkg', '--set-selections'], - stdin=subprocess.PIPE) - dpkg.communicate(input=''.join(selections).encode('US-ASCII')) - - -def get_seed_ips(): - '''Return the set of seed ip addresses. - - We use ip addresses rather than unit names, as we may need to use - external seed ips at some point. - ''' - return set((hookenv.leader_get('seeds') or '').split(',')) - - -def actual_seed_ips(): - '''Return the seeds currently in cassandra.yaml''' - cassandra_yaml = read_cassandra_yaml() - s = cassandra_yaml['seed_provider'][0]['parameters'][0]['seeds'] - return set(s.split(',')) - - -def get_database_directory(config_path): - '''Convert a database path from the service config to an absolute path. - - Entries in the config file may be absolute, relative to - /var/lib/cassandra, or relative to the mountpoint. - ''' - import relations - storage = relations.StorageRelation() - if storage.mountpoint: - root = os.path.join(storage.mountpoint, 'cassandra') - else: - root = '/var/lib/cassandra' - return os.path.join(root, config_path) - - -def ensure_database_directory(config_path): - '''Create the database directory if it doesn't exist, resetting - ownership and other settings while we are at it. - - Returns the absolute path. - ''' - absdir = get_database_directory(config_path) - - # Work around Bug #1427150 by ensuring components of the path are - # created with the required permissions, if necessary. - component = os.sep - for p in absdir.split(os.sep)[1:-1]: - component = os.path.join(component, p) - if not os.path.exists(p): - host.mkdir(component) - assert component == os.path.split(absdir)[0] - host.mkdir(absdir, owner='cassandra', group='cassandra', perms=0o750) - return absdir - - -def get_all_database_directories(): - config = hookenv.config() - dirs = dict( - data_file_directories=[get_database_directory(d) - for d in (config['data_file_directories'] or - 'data').split()], - commitlog_directory=get_database_directory( - config['commitlog_directory'] or 'commitlog'), - saved_caches_directory=get_database_directory( - config['saved_caches_directory'] or 'saved_caches')) - if has_cassandra_version('3.0'): - # Not yet configurable. Make configurable with Juju native storage. - dirs['hints_directory'] = get_database_directory('hints') - return dirs - - -def mountpoint(path): - '''Return the mountpoint that path exists on.''' - path = os.path.realpath(path) - while path != '/' and not os.path.ismount(path): - path = os.path.dirname(path) - return path - - -# FOR CHARMHELPERS -def is_lxc(): - '''Return True if we are running inside an LXC container.''' - with open('/proc/1/cgroup', 'r') as f: - return ':/lxc/' in f.readline() - - -# FOR CHARMHELPERS -def set_io_scheduler(io_scheduler, directory): - '''Set the block device io scheduler.''' - - assert os.path.isdir(directory) - - # The block device regex may be a tad simplistic. - block_regex = re.compile('\/dev\/([a-z]*)', re.IGNORECASE) - - output = subprocess.check_output(['df', directory], - universal_newlines=True) - - if not is_lxc(): - hookenv.log("Setting block device of {} to IO scheduler {}" - "".format(directory, io_scheduler)) - try: - block_dev = re.findall(block_regex, output)[0] - except IndexError: - hookenv.log("Unable to locate block device of {} (in container?)" - "".format(directory)) - return - sys_file = os.path.join("/", "sys", "block", block_dev, - "queue", "scheduler") - try: - host.write_file(sys_file, io_scheduler.encode('ascii'), - perms=0o644) - except OSError as e: - if e.errno == errno.EACCES: - hookenv.log("Got Permission Denied trying to set the " - "IO scheduler at {}. We may be in an LXC. " - "Exiting gracefully".format(sys_file), - WARNING) - elif e.errno == errno.ENOENT: - hookenv.log("Got no such file or directory trying to " - "set the IO scheduler at {}. It may be " - "this is an LXC, the device name is as " - "yet unknown to the charm, or LVM/RAID is " - "hiding the underlying device name. " - "Exiting gracefully".format(sys_file), - WARNING) - else: - raise e - else: - # Make no change if we are in an LXC - hookenv.log("In an LXC. Cannot set io scheduler {}" - "".format(io_scheduler)) - - -# FOR CHARMHELPERS -def recursive_chown(directory, owner="root", group="root"): - '''Change ownership of all files and directories in 'directory'. - - Ownership of 'directory' is also reset. - ''' - shutil.chown(directory, owner, group) - for root, dirs, files in os.walk(directory): - for dirname in dirs: - shutil.chown(os.path.join(root, dirname), owner, group) - for filename in files: - shutil.chown(os.path.join(root, filename), owner, group) - - -def maybe_backup(path): - '''Copy a file to file.orig, if file.orig does not already exist.''' - backup_path = path + '.orig' - if not os.path.exists(backup_path): - with open(path, 'rb') as f: - host.write_file(backup_path, f.read(), perms=0o600) - - -# FOR CHARMHELPERS -def get_package_version(package): - cache = fetch.apt_cache() - if package not in cache: - return None - pkgver = cache[package].current_ver - if pkgver is not None: - return pkgver.ver_str - return None - - -def get_jre(): - # DataStax Enterprise requires the Oracle JRE. - if get_cassandra_edition() == 'dse': - return 'oracle' - - config = hookenv.config() - jre = config['jre'].lower() - if jre not in ('openjdk', 'oracle'): - hookenv.log('Unknown JRE {!r} specified. Using OpenJDK'.format(jre), - ERROR) - jre = 'openjdk' - return jre - - -def get_cassandra_edition(): - config = hookenv.config() - edition = config['edition'].lower() - if edition not in ('community', 'dse'): - hookenv.log('Unknown edition {!r}. Using community.'.format(edition), - ERROR) - edition = 'community' - return edition - - -def get_cassandra_service(): - '''Cassandra upstart service''' - if get_cassandra_edition() == 'dse': - return 'dse' - return 'cassandra' - - -def get_cassandra_version(): - if get_cassandra_edition() == 'dse': - dse_ver = get_package_version('dse-full') - if not dse_ver: - return None - elif LooseVersion(dse_ver) >= LooseVersion('5.0'): - return '3.0' - elif LooseVersion(dse_ver) >= LooseVersion('4.7'): - return '2.1' - else: - return '2.0' - return get_package_version('cassandra') - - -def has_cassandra_version(minimum_ver): - cassandra_version = get_cassandra_version() - assert cassandra_version is not None, 'Cassandra package not yet installed' - return LooseVersion(cassandra_version) >= LooseVersion(minimum_ver) - - -def get_cassandra_config_dir(): - if get_cassandra_edition() == 'dse': - return '/etc/dse/cassandra' - else: - return '/etc/cassandra' - - -def get_cassandra_yaml_file(): - return os.path.join(get_cassandra_config_dir(), "cassandra.yaml") - - -def get_cassandra_env_file(): - return os.path.join(get_cassandra_config_dir(), "cassandra-env.sh") - - -def get_cassandra_rackdc_file(): - return os.path.join(get_cassandra_config_dir(), - "cassandra-rackdc.properties") - - -def get_cassandra_pid_file(): - edition = get_cassandra_edition() - if edition == 'dse': - pid_file = "/var/run/dse/dse.pid" - else: - pid_file = "/var/run/cassandra/cassandra.pid" - return pid_file - - -def get_cassandra_packages(): - edition = get_cassandra_edition() - if edition == 'dse': - packages = set(['dse-full']) - else: - packages = set(['cassandra']) # 'cassandra-tools' - - packages.add('ntp') - packages.add('run-one') - packages.add('netcat') - - jre = get_jre() - if jre == 'oracle': - # We can't use a packaged version of the Oracle JRE, as we - # are not allowed to bypass Oracle's click through license - # agreement. - pass - else: - # NB. OpenJDK 8 not available in trusty. This needs to come - # from a PPA or some other configured source. - packages.add('openjdk-8-jre-headless') - - return packages - - -@logged -def stop_cassandra(): - if is_cassandra_running(): - hookenv.log('Shutting down Cassandra') - host.service_stop(get_cassandra_service()) - if is_cassandra_running(): - hookenv.status_set('blocked', 'Cassandra failed to shut down') - raise SystemExit(0) - - -@logged -def start_cassandra(): - if is_cassandra_running(): - return - - actual_seeds = sorted(actual_seed_ips()) - assert actual_seeds, 'Attempting to start cassandra with empty seed list' - hookenv.config()['configured_seeds'] = actual_seeds - - if is_bootstrapped(): - status_set('maintenance', - 'Starting Cassandra with seeds {!r}' - .format(','.join(actual_seeds))) - else: - status_set('maintenance', - 'Bootstrapping with seeds {}' - .format(','.join(actual_seeds))) - - host.service_start(get_cassandra_service()) - - # Wait for Cassandra to actually start, or abort. - timeout = time.time() + RESTART_TIMEOUT - while time.time() < timeout: - if is_cassandra_running(): - return - time.sleep(1) - status_set('blocked', 'Cassandra failed to start') - raise SystemExit(0) - - -@logged -def reconfigure_and_restart_cassandra(overrides={}): - stop_cassandra() - configure_cassandra_yaml(overrides) - start_cassandra() - - -@logged -def remount_cassandra(): - '''If a new mountpoint is ready, migrate data across to it.''' - assert not is_cassandra_running() # Guard against data loss. - import relations - storage = relations.StorageRelation() - if storage.needs_remount(): - status_set('maintenance', 'Migrating data to new mountpoint') - hookenv.config()['bootstrapped_into_cluster'] = False - if storage.mountpoint is None: - hookenv.log('External storage AND DATA gone. ' - 'Reverting to local storage. ' - 'In danger of resurrecting old data. ', - WARNING) - else: - storage.migrate('/var/lib/cassandra', 'cassandra') - root = os.path.join(storage.mountpoint, 'cassandra') - os.chmod(root, 0o750) - - -@logged -def ensure_database_directories(): - '''Ensure that directories Cassandra expects to store its data in exist.''' - # Guard against changing perms on a running db. Although probably - # harmless, it causes shutil.chown() to fail. - assert not is_cassandra_running() - db_dirs = get_all_database_directories() - ensure_database_directory(db_dirs['commitlog_directory']) - ensure_database_directory(db_dirs['saved_caches_directory']) - if 'hints_directory' in db_dirs: - ensure_database_directory(db_dirs['hints_directory']) - for db_dir in db_dirs['data_file_directories']: - ensure_database_directory(db_dir) - - -CONNECT_TIMEOUT = 10 - - -@contextmanager -def connect(username=None, password=None, timeout=CONNECT_TIMEOUT, - auth_timeout=CONNECT_TIMEOUT): - # We pull the currently configured listen address and port from the - # yaml, rather than the service configuration, as it may have been - # overridden. - cassandra_yaml = read_cassandra_yaml() - address = cassandra_yaml['rpc_address'] - if address == '0.0.0.0': - address = 'localhost' - port = cassandra_yaml['native_transport_port'] - - if username is None or password is None: - username, password = superuser_credentials() - - auth = hookenv.config()['authenticator'] - if auth == 'AllowAllAuthenticator': - auth_provider = None - else: - auth_provider = cassandra.auth.PlainTextAuthProvider(username=username, - password=password) - - # Although we specify a reconnection_policy, it does not apply to - # the initial connection so we retry in a loop. - start = time.time() - until = start + timeout - auth_until = start + auth_timeout - while True: - cluster = cassandra.cluster.Cluster([address], port=port, - auth_provider=auth_provider) - try: - session = cluster.connect() - session.default_timeout = timeout - break - except cassandra.cluster.NoHostAvailable as x: - cluster.shutdown() - now = time.time() - # If every node failed auth, reraise one of the - # AuthenticationFailed exceptions. Unwrapping the exception - # means call sites don't have to sniff the exception bundle. - # We don't retry on auth fails; this method should not be - # called if the system_auth data is inconsistent. - auth_fails = [af for af in x.errors.values() - if isinstance(af, cassandra.AuthenticationFailed)] - if auth_fails: - if now > auth_until: - raise auth_fails[0] - if now > until: - raise - time.sleep(1) - try: - yield session - finally: - cluster.shutdown() - - -QUERY_TIMEOUT = 60 - - -def query(session, statement, consistency_level, args=None): - q = cassandra.query.SimpleStatement(statement, - consistency_level=consistency_level) - - until = time.time() + QUERY_TIMEOUT - for _ in backoff('query to execute'): - try: - return session.execute(q, args) - except Exception: - if time.time() > until: - raise - - -def encrypt_password(password): - return bcrypt.hashpw(password, bcrypt.gensalt()) - - -@logged -def ensure_user(session, username, encrypted_password, superuser=False): - '''Create the DB user if it doesn't already exist & reset the password.''' - auth = hookenv.config()['authenticator'] - if auth == 'AllowAllAuthenticator': - return # No authentication means we cannot create users - - if superuser: - hookenv.log('Creating SUPERUSER {}'.format(username)) - else: - hookenv.log('Creating user {}'.format(username)) - if has_cassandra_version('2.2'): - query(session, - 'INSERT INTO system_auth.roles ' - '(role, can_login, is_superuser, salted_hash) ' - 'VALUES (%s, TRUE, %s, %s)', - ConsistencyLevel.ALL, - (username, superuser, encrypted_password)) - else: - query(session, - 'INSERT INTO system_auth.users (name, super) VALUES (%s, %s)', - ConsistencyLevel.ALL, (username, superuser)) - query(session, - 'INSERT INTO system_auth.credentials (username, salted_hash) ' - 'VALUES (%s, %s)', - ConsistencyLevel.ALL, (username, encrypted_password)) - - -@logged -def create_unit_superuser_hard(): - '''Create or recreate the unit's superuser account. - - This method is used when there are no known superuser credentials - to use. We restart the node using the AllowAllAuthenticator and - insert our credentials directly into the system_auth keyspace. - ''' - username, password = superuser_credentials() - pwhash = encrypt_password(password) - hookenv.log('Creating unit superuser {}'.format(username)) - - # Restart cassandra without authentication & listening on localhost. - reconfigure_and_restart_cassandra( - dict(authenticator='AllowAllAuthenticator', rpc_address='localhost')) - for _ in backoff('superuser creation'): - try: - with connect() as session: - ensure_user(session, username, pwhash, superuser=True) - break - except Exception as x: - print(str(x)) - - # Restart Cassandra with regular config. - nodetool('flush') # Ensure our backdoor updates are flushed. - reconfigure_and_restart_cassandra() - - -def get_cqlshrc_path(): - return os.path.expanduser('~root/.cassandra/cqlshrc') - - -def superuser_username(): - return 'juju_{}'.format(re.subn(r'\W', '_', hookenv.local_unit())[0]) - - -def superuser_credentials(): - '''Return (username, password) to connect to the Cassandra superuser. - - The credentials are persisted in the root user's cqlshrc file, - making them easily accessible to the command line tools. - ''' - cqlshrc_path = get_cqlshrc_path() - cqlshrc = configparser.ConfigParser(interpolation=None) - cqlshrc.read([cqlshrc_path]) - - username = superuser_username() - - try: - section = cqlshrc['authentication'] - # If there happened to be an existing cqlshrc file, it might - # contain invalid credentials. Ignore them. - if section['username'] == username: - return section['username'], section['password'] - except KeyError: - hookenv.log('Generating superuser credentials into {}'.format( - cqlshrc_path)) - - config = hookenv.config() - - password = host.pwgen() - - hookenv.log('Generated username {}'.format(username)) - - # We set items separately, rather than together, so that we have a - # defined order for the ConfigParser to preserve and the tests to - # rely on. - cqlshrc.setdefault('authentication', {}) - cqlshrc['authentication']['username'] = username - cqlshrc['authentication']['password'] = password - cqlshrc.setdefault('connection', {}) - cqlshrc['connection']['hostname'] = hookenv.unit_public_ip() - if get_cassandra_version().startswith('2.0'): - cqlshrc['connection']['port'] = str(config['rpc_port']) - else: - cqlshrc['connection']['port'] = str(config['native_transport_port']) - - ini = io.StringIO() - cqlshrc.write(ini) - host.mkdir(os.path.dirname(cqlshrc_path), perms=0o700) - host.write_file(cqlshrc_path, ini.getvalue().encode('UTF-8'), perms=0o400) - - return username, password - - -def emit(*args, **kw): - # Just like print, but with plumbing and mocked out in the test suite. - print(*args, **kw) - sys.stdout.flush() - - -def nodetool(*cmd, timeout=120): - cmd = ['nodetool'] + [str(i) for i in cmd] - i = 0 - until = time.time() + timeout - for _ in backoff('nodetool to work'): - i += 1 - try: - if timeout is not None: - timeout = max(0, until - time.time()) - raw = subprocess.check_output(cmd, universal_newlines=True, - timeout=timeout, - stderr=subprocess.STDOUT) - - # Work around CASSANDRA-8776. - if 'status' in cmd and 'Error:' in raw: - hookenv.log('Error detected but nodetool returned success.', - WARNING) - raise subprocess.CalledProcessError(99, cmd, raw) - - hookenv.log('{} succeeded'.format(' '.join(cmd)), DEBUG) - out = raw.expandtabs() - emit(out) - return out - - except subprocess.CalledProcessError as x: - if i > 1: - emit(x.output.expandtabs()) # Expand tabs for juju debug-log. - if not is_cassandra_running(): - status_set('blocked', - 'Cassandra has unexpectedly shutdown') - raise SystemExit(0) - if time.time() >= until: - raise - - -def num_nodes(): - return len(get_bootstrapped_ips()) - - -def read_cassandra_yaml(): - cassandra_yaml_path = get_cassandra_yaml_file() - with open(cassandra_yaml_path, 'rb') as f: - return yaml.safe_load(f) - - -@logged -def write_cassandra_yaml(cassandra_yaml): - cassandra_yaml_path = get_cassandra_yaml_file() - host.write_file(cassandra_yaml_path, - yaml.safe_dump(cassandra_yaml).encode('UTF-8')) - - -def configure_cassandra_yaml(overrides={}, seeds=None): - cassandra_yaml_path = get_cassandra_yaml_file() - config = hookenv.config() - - maybe_backup(cassandra_yaml_path) # Its comments may be useful. - - cassandra_yaml = read_cassandra_yaml() - - # Most options just copy from config.yaml keys with the same name. - # Using the same name is preferred to match the actual Cassandra - # documentation. - simple_config_keys = ['cluster_name', 'num_tokens', - 'partitioner', 'authorizer', 'authenticator', - 'compaction_throughput_mb_per_sec', - 'stream_throughput_outbound_megabits_per_sec', - 'tombstone_warn_threshold', - 'tombstone_failure_threshold', - 'native_transport_port', 'rpc_port', - 'storage_port', 'ssl_storage_port'] - cassandra_yaml.update((k, config[k]) for k in simple_config_keys) - - seeds = ','.join(seeds or get_seed_ips()) # Don't include whitespace! - hookenv.log('Configuring seeds as {!r}'.format(seeds), DEBUG) - cassandra_yaml['seed_provider'][0]['parameters'][0]['seeds'] = seeds - - cassandra_yaml['listen_address'] = hookenv.unit_private_ip() - cassandra_yaml['rpc_address'] = '0.0.0.0' - if not get_cassandra_version().startswith('2.0'): - cassandra_yaml['broadcast_rpc_address'] = hookenv.unit_public_ip() - - dirs = get_all_database_directories() - cassandra_yaml.update(dirs) - - # GossipingPropertyFileSnitch is the only snitch recommended for - # production. It we allow others, we need to consider how to deal - # with the system_auth keyspace replication settings. - cassandra_yaml['endpoint_snitch'] = 'GossipingPropertyFileSnitch' - - # Per Bug #1523546 and CASSANDRA-9319, Thrift is disabled by default in - # Cassandra 2.2. Ensure it is enabled if rpc_port is non-zero. - if int(config['rpc_port']) > 0: - cassandra_yaml['start_rpc'] = True - - cassandra_yaml.update(overrides) - - write_cassandra_yaml(cassandra_yaml) - - -def get_pid_from_file(pid_file): - try: - with open(pid_file, 'r') as f: - pid = int(f.read().strip().split()[0]) - if pid <= 1: - raise ValueError('Illegal pid {}'.format(pid)) - return pid - except (ValueError, IndexError) as e: - hookenv.log("Invalid PID in {}.".format(pid_file)) - raise ValueError(e) - - -def is_cassandra_running(): - pid_file = get_cassandra_pid_file() - - try: - for _ in backoff('Cassandra to respond'): - # We reload the pid every time, in case it has gone away. - # If it goes away, a FileNotFound exception is raised. - pid = get_pid_from_file(pid_file) - - # This does not kill the process but checks for its - # existence. It raises an ProcessLookupError if the process - # is not running. - os.kill(pid, 0) - - if subprocess.call(["nodetool", "status"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) == 0: - hookenv.log( - "Cassandra PID {} is running and responding".format(pid)) - return True - except FileNotFoundError: - hookenv.log("Cassandra is not running. PID file does not exist.") - return False - except ProcessLookupError: - if os.path.exists(pid_file): - # File disappeared between reading the PID and checking if - # the PID is running. - hookenv.log("Cassandra is not running, but pid file exists.", - WARNING) - else: - hookenv.log("Cassandra is not running. PID file does not exist.") - return False - - -def get_auth_keyspace_replication(session): - if has_cassandra_version('3.0'): - statement = dedent('''\ - SELECT replication FROM system_schema.keyspaces - WHERE keyspace_name='system_auth' - ''') - r = query(session, statement, ConsistencyLevel.QUORUM) - return dict(r[0][0]) - else: - statement = dedent('''\ - SELECT strategy_options FROM system.schema_keyspaces - WHERE keyspace_name='system_auth' - ''') - r = query(session, statement, ConsistencyLevel.QUORUM) - return json.loads(r[0][0]) - - -@logged -def set_auth_keyspace_replication(session, settings): - # Live operation, so keep status the same. - status_set(hookenv.status_get()[0], - 'Updating system_auth rf to {!r}'.format(settings)) - statement = 'ALTER KEYSPACE system_auth WITH REPLICATION = %s' - query(session, statement, ConsistencyLevel.ALL, (settings,)) - - -@logged -def repair_auth_keyspace(): - # Repair takes a long time, and may need to be retried due to 'snapshot - # creation' errors, but should certainly complete within an hour since - # the keyspace is tiny. - status_set(hookenv.status_get()[0], - 'Repairing system_auth keyspace') - nodetool('repair', 'system_auth', timeout=3600) - - -def is_bootstrapped(unit=None): - '''Return True if the node has already bootstrapped into the cluster.''' - if unit is None or unit == hookenv.local_unit(): - return hookenv.config().get('bootstrapped', False) - elif coordinator.relid: - return bool(hookenv.relation_get(rid=coordinator.relid, - unit=unit).get('bootstrapped')) - else: - return False - - -def set_bootstrapped(): - # We need to store this flag in two locations. The peer relation, - # so peers can see it, and local state, for when we haven't joined - # the peer relation yet. actions.publish_bootstrapped_flag() - # calls this method again when necessary to ensure that state is - # propagated # if/when the peer relation is joined. - config = hookenv.config() - config['bootstrapped'] = True - if coordinator.relid is not None: - hookenv.relation_set(coordinator.relid, bootstrapped="1") - if config.changed('bootstrapped'): - hookenv.log('Bootstrapped') - else: - hookenv.log('Already bootstrapped') - - -def get_bootstrapped(): - units = [hookenv.local_unit()] - if coordinator.relid is not None: - units.extend(hookenv.related_units(coordinator.relid)) - return set([unit for unit in units if is_bootstrapped(unit)]) - - -def get_bootstrapped_ips(): - return set([unit_to_ip(unit) for unit in get_bootstrapped()]) - - -def unit_to_ip(unit): - if unit is None or unit == hookenv.local_unit(): - return hookenv.unit_private_ip() - elif coordinator.relid: - pa = hookenv.relation_get(rid=coordinator.relid, - unit=unit).get('private-address') - return hookenv._ensure_ip(pa) - else: - return None - - -def get_node_status(): - '''Return the Cassandra node status. - - May be NORMAL, JOINING, DECOMMISSIONED etc., or None if we can't tell. - ''' - if not is_cassandra_running(): - return None - raw = nodetool('netstats') - m = re.search(r'(?m)^Mode:\s+(\w+)$', raw) - if m is None: - return None - return m.group(1).upper() - - -def is_decommissioned(): - status = get_node_status() - if status in ('DECOMMISSIONED', 'LEAVING'): - hookenv.log('This node is {}'.format(status), WARNING) - return True - return False - - -@logged -def emit_describe_cluster(): - '''Run nodetool describecluster for the logs.''' - nodetool('describecluster') # Implicit emit - - -@logged -def emit_status(): - '''Run 'nodetool status' for the logs.''' - nodetool('status') # Implicit emit - - -@logged -def emit_netstats(): - '''Run 'nodetool netstats' for the logs.''' - nodetool('netstats') # Implicit emit - - -def emit_cluster_info(): - emit_describe_cluster() - emit_status() - emit_netstats() - - -# FOR CHARMHELPERS (and think of a better name) -def week_spread(unit_num): - '''Pick a time for a unit's weekly job. - - Jobs are spread out evenly throughout the week as best we can. - The chosen time only depends on the unit number, and does not change - if other units are added and removed; while the chosen time will not - be perfect, we don't have to worry about skipping a weekly job if - units are added or removed at the wrong moment. - - Returns (dow, hour, minute) suitable for cron. - ''' - def vdc(n, base=2): - '''Van der Corpet sequence. 0, 0.5, 0.25, 0.75, 0.125, 0.625, ... - - http://rosettacode.org/wiki/Van_der_Corput_sequence#Python - ''' - vdc, denom = 0, 1 - while n: - denom *= base - n, remainder = divmod(n, base) - vdc += remainder / denom - return vdc - # We could use the vdc() function to distribute jobs evenly throughout - # the week, so unit 0==0, unit 1==3.5days, unit 2==1.75 etc. But - # plain modulo for the day of week is easier for humans and what - # you expect for 7 units or less. - sched_dow = unit_num % 7 - # We spread time of day so each batch of 7 units gets the same time, - # as far spread out from the other batches of 7 units as possible. - minutes_in_day = 24 * 60 - sched = timedelta(minutes=int(minutes_in_day * vdc(unit_num // 7))) - sched_hour = sched.seconds // (60 * 60) - sched_minute = sched.seconds // 60 - sched_hour * 60 - return (sched_dow, sched_hour, sched_minute) - - -# FOR CHARMHELPERS. This should be a constant in nrpe.py -def local_plugins_dir(): - return '/usr/local/lib/nagios/plugins' - - -def leader_ping(): - '''Make a change in the leader settings, waking the non-leaders.''' - assert hookenv.is_leader() - last = int(hookenv.leader_get('ping') or 0) - hookenv.leader_set(ping=str(last + 1)) - - -def get_unit_superusers(): - '''Return the set of units that have had their superuser accounts created. - ''' - raw = hookenv.leader_get('superusers') - return set(json.loads(raw or '[]')) - - -def set_unit_superusers(superusers): - hookenv.leader_set(superusers=json.dumps(sorted(superusers))) - - -def status_set(state, message): - '''Set the unit status and log a message.''' - hookenv.status_set(state, message) - hookenv.log('{} unit state: {}'.format(state, message)) - - -def service_status_set(state, message): - '''Set the service status and log a message.''' - subprocess.check_call(['status-set', '--service', state, message]) - hookenv.log('{} service state: {}'.format(state, message)) - - -def get_service_name(relid): - '''Return the service name for the other end of relid.''' - units = hookenv.related_units(relid) - if units: - return units[0].split('/', 1)[0] - else: - return None - - -def peer_relid(): - return coordinator.relid - - -@logged -def set_active(): - '''Set happy state''' - if hookenv.unit_private_ip() in get_seed_ips(): - msg = 'Live seed' - else: - msg = 'Live node' - status_set('active', msg) - - if hookenv.is_leader(): - n = num_nodes() - if n == 1: - n = 'Single' - service_status_set('active', '{} node cluster'.format(n)) - - -def update_hosts_file(hosts_file, hosts_map): - """Older versions of Cassandra need own hostname resolution.""" - with open(hosts_file, 'r') as hosts: - lines = hosts.readlines() - - newlines = [] - for ip, hostname in hosts_map.items(): - if not ip or not hostname: - continue - - keepers = [] - for line in lines: - _line = line.split() - if len(_line) < 2 or not (_line[0] == ip or hostname in _line[1:]): - keepers.append(line) - else: - hookenv.log('Marking line {!r} for update or removal' - ''.format(line.strip()), level=DEBUG) - - lines = keepers - newlines.append('{} {}\n'.format(ip, hostname)) - - lines += newlines - - with tempfile.NamedTemporaryFile(delete=False) as tmpfile: - with open(tmpfile.name, 'w') as hosts: - for line in lines: - hosts.write(line) - - os.rename(tmpfile.name, hosts_file) - os.chmod(hosts_file, 0o644) |