From 4faa7f927149a5c4ef7a03523f7bc14523cb9baa Mon Sep 17 00:00:00 2001 From: Stuart Mackie Date: Fri, 7 Oct 2016 12:24:58 -0700 Subject: Charms for Contrail 3.1 with Mitaka Change-Id: Id37f3b9743d1974e31fcd7cd9c54be41bb0c47fb Signed-off-by: Stuart Mackie --- charms/trusty/cassandra/hooks/actions.py | 990 +++++++++++++++++++++++++++++++ 1 file changed, 990 insertions(+) create mode 100644 charms/trusty/cassandra/hooks/actions.py (limited to 'charms/trusty/cassandra/hooks/actions.py') diff --git a/charms/trusty/cassandra/hooks/actions.py b/charms/trusty/cassandra/hooks/actions.py new file mode 100644 index 0000000..8887056 --- /dev/null +++ b/charms/trusty/cassandra/hooks/actions.py @@ -0,0 +1,990 @@ +# Copyright 2015 Canonical Ltd. +# +# This file is part of the Cassandra Charm for Juju. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranties of +# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from contextlib import closing +import errno +from functools import wraps +import glob +import os.path +import re +import shlex +import socket +import subprocess +from textwrap import dedent +import time +import urllib.request + +from charmhelpers import fetch +from charmhelpers.contrib.charmsupport import nrpe +from charmhelpers.contrib.network import ufw +from charmhelpers.contrib.templating import jinja +from charmhelpers.core import hookenv, host +from charmhelpers.core.fstab import Fstab +from charmhelpers.core.hookenv import DEBUG, ERROR, WARNING + +import cassandra + +from coordinator import coordinator +import helpers +import relations + + +# These config keys cannot be changed after service deployment. +UNCHANGEABLE_KEYS = set(['cluster_name', 'datacenter', 'rack', 'edition']) + +# If any of these config items are changed, Cassandra needs to be +# restarted and maybe remounted. +RESTART_REQUIRED_KEYS = set([ + 'data_file_directories', + 'commitlog_directory', + 'saved_caches_directory', + 'storage_port', + 'ssl_storage_port', + 'rpc_port', + 'native_transport_port', + 'partitioner', + 'num_tokens', + 'max_heap_size', + 'heap_newsize', + 'authenticator', + 'authorizer', + 'compaction_throughput_mb_per_sec', + 'stream_throughput_outbound_megabits_per_sec', + 'tombstone_warn_threshold', + 'tombstone_failure_threshold', + 'jre', + 'private_jre_url']) + +ALL_CONFIG_KEYS = UNCHANGEABLE_KEYS.union(RESTART_REQUIRED_KEYS) + + +# All other config items. By maintaining both lists, we can detect if +# someone forgot to update these lists when they added a new config item. +RESTART_NOT_REQUIRED_KEYS = set([ + 'extra_packages', + 'package_status', + 'install_sources', + 'install_keys', + 'http_proxy', + 'wait_for_storage_broker', + 'io_scheduler', + 'nagios_context', + 'nagios_servicegroups', + 'nagios_heapchk_warn_pct', + 'nagios_heapchk_crit_pct', + 'nagios_disk_warn_pct', + 'nagios_disk_crit_pct']) + + +def action(func): + '''Log and call func, stripping the undesirable servicename argument. + ''' + @wraps(func) + def wrapper(servicename, *args, **kw): + if hookenv.remote_unit(): + hookenv.log("** Action {}/{} ({})".format(hookenv.hook_name(), + func.__name__, + hookenv.remote_unit())) + else: + hookenv.log("** Action {}/{}".format(hookenv.hook_name(), + func.__name__)) + return func(*args, **kw) + return wrapper + + +def leader_only(func): + '''Decorated function is only run on the leader.''' + @wraps(func) + def wrapper(*args, **kw): + if hookenv.is_leader(): + return func(*args, **kw) + else: + return None + return wrapper + + +def authentication(func): + '''Decorated function is skipped if authentication is disabled.''' + @wraps(func) + def wrapper(*args, **kw): + auth = hookenv.config()['authenticator'] + if auth == 'PasswordAuthenticator': + return func(*args, **kw) + elif auth == 'AllowAllAuthenticator': + hookenv.log('Skipped. Authentication disabled.', DEBUG) + return None + helpers.status_set('blocked', 'Unknown authenticator {}'.format(auth)) + raise SystemExit(0) + return wrapper + + +@action +def set_proxy(): + import hooks + hooks.set_proxy() + + +@action +def revert_unchangeable_config(): + config = hookenv.config() + + # config.previous() only becomes meaningful after the install + # hook has run. During the first run on the unit hook, it + # reports everything has having None as the previous value. + if config._prev_dict is None: + return + + for key in UNCHANGEABLE_KEYS: + if config.changed(key): + previous = config.previous(key) + hookenv.log('{} cannot be changed after service deployment. ' + 'Using original setting {!r}'.format(key, previous), + ERROR) + config[key] = previous + + +# FOR CHARMHELPERS +@action +def preinstall(): + '''Preinstallation data_ready hook.''' + # Only run the preinstall hooks from the actual install hook. + if hookenv.hook_name() == 'install': + # Pre-exec + pattern = os.path.join(hookenv.charm_dir(), + 'exec.d', '*', 'charm-pre-install') + for f in sorted(glob.glob(pattern)): + if os.path.isfile(f) and os.access(f, os.X_OK): + hookenv.log('Running preinstall hook {}'.format(f)) + subprocess.check_call(['sh', '-c', f]) + else: + hookenv.log('Ingnoring preinstall hook {}'.format(f), + WARNING) + else: + hookenv.log('No preinstall hooks found') + + +# FOR CHARMHELPERS +@action +def swapoff(fstab='/etc/fstab'): + '''Turn off swapping in the container, permanently.''' + # Turn off swap in the current session + if helpers.is_lxc(): + hookenv.log("In an LXC. Not touching swap.") + return + else: + try: + subprocess.check_call(['swapoff', '-a']) + except Exception as e: + hookenv.log("Got an error trying to turn off swapping. {}. " + "We may be in an LXC. Exiting gracefully" + "".format(e), WARNING) + return + + # Disable swap permanently + with closing(Fstab(fstab)) as fstab: + while True: + swap_entry = fstab.get_entry_by_attr('filesystem', 'swap') + if swap_entry is None: + break + fstab.remove_entry(swap_entry) + + +# FOR CHARMHELPERS +@action +def configure_sources(): + '''Standard charmhelpers package source configuration.''' + config = hookenv.config() + if config.changed('install_sources') or config.changed('install_keys'): + fetch.configure_sources(True) + + +@action +def add_implicit_package_signing_keys(): + # Rather than blindly add these keys, we should sniff + # config['install_sources'] for apache.org or datastax.com urls and + # add only the appropriate keys. + for key in ('apache', 'datastax'): + path = os.path.join(hookenv.charm_dir(), 'lib', '{}.key'.format(key)) + subprocess.check_call(['apt-key', 'add', path], + stdin=subprocess.DEVNULL) + + +@action +def reset_sysctl(): + '''Configure sysctl settings for Cassandra''' + if helpers.is_lxc(): + hookenv.log("In an LXC. Leaving sysctl unchanged.") + else: + cassandra_sysctl_file = os.path.join('/', 'etc', 'sysctl.d', + '99-cassandra.conf') + contents = b"vm.max_map_count = 131072\n" + try: + host.write_file(cassandra_sysctl_file, contents) + subprocess.check_call(['sysctl', '-p', cassandra_sysctl_file]) + except OSError as e: + if e.errno == errno.EACCES: + hookenv.log("Got Permission Denied trying to set the " + "sysctl settings at {}. We may be in an LXC. " + "Exiting gracefully".format(cassandra_sysctl_file), + WARNING) + else: + raise + + +@action +def reset_limits(): + '''Set /etc/security/limits.d correctly for Ubuntu, so the + startup scripts don't emit a spurious warning. + + Per Cassandra documentation, Ubuntu needs some extra + twiddling in /etc/security/limits.d. I have no idea why + the packages don't do this, since they are already + setting limits for the cassandra user correctly. The real + bug is that the limits of the user running the startup script + are being checked, rather than the limits of the user that will + actually run the process. + ''' + contents = dedent('''\ + # Maintained by Juju + root - memlock unlimited + root - nofile 100000 + root - nproc 32768 + root - as unlimited + ubuntu - memlock unlimited + ubuntu - nofile 100000 + ubuntu - nproc 32768 + ubuntu - as unlimited + ''') + host.write_file('/etc/security/limits.d/cassandra-charm.conf', + contents.encode('US-ASCII')) + + +@action +def install_cassandra_packages(): + helpers.install_packages(helpers.get_cassandra_packages()) + if helpers.get_jre() != 'oracle': + subprocess.check_call(['update-java-alternatives', + '--jre-headless', + '--set', 'java-1.8.0-openjdk-amd64']) + + +@action +def ensure_cassandra_package_status(): + helpers.ensure_package_status(helpers.get_cassandra_packages()) + + +def _fetch_oracle_jre(): + config = hookenv.config() + url = config.get('private_jre_url', None) + if url and config.get('retrieved_jre', None) != url: + filename = os.path.join(hookenv.charm_dir(), + 'lib', url.split('/')[-1]) + if not filename.endswith('-linux-x64.tar.gz'): + helpers.status_set('blocked', + 'Invalid private_jre_url {}'.format(url)) + raise SystemExit(0) + helpers.status_set(hookenv.status_get()[0], + 'Downloading Oracle JRE') + hookenv.log('Oracle JRE URL is {}'.format(url)) + urllib.request.urlretrieve(url, filename) + config['retrieved_jre'] = url + + pattern = os.path.join(hookenv.charm_dir(), + 'lib', 'server-jre-?u*-linux-x64.tar.gz') + tarballs = glob.glob(pattern) + if not (url or tarballs): + helpers.status_set('blocked', + 'private_jre_url not set and no local tarballs.') + raise SystemExit(0) + + elif not tarballs: + helpers.status_set('blocked', + 'Oracle JRE tarball not found ({})'.format(pattern)) + raise SystemExit(0) + + # Latest tarball by filename/version num. Lets hope they don't hit + # 99 (currently at 76). + tarball = sorted(tarballs)[-1] + return tarball + + +def _install_oracle_jre_tarball(tarball): + # Same directory as webupd8 to avoid surprising people, but it could + # be anything. + if 'jre-7u' in str(tarball): + dest = '/usr/lib/jvm/java-7-oracle' + else: + dest = '/usr/lib/jvm/java-8-oracle' + + if not os.path.isdir(dest): + host.mkdir(dest) + + jre_exists = os.path.exists(os.path.join(dest, 'bin', 'java')) + + config = hookenv.config() + + # Unpack the latest tarball if necessary. + if config.get('oracle_jre_tarball', '') == tarball and jre_exists: + hookenv.log('Already installed {}'.format(tarball)) + else: + hookenv.log('Unpacking {}'.format(tarball)) + subprocess.check_call(['tar', '-xz', '-C', dest, + '--strip-components=1', '-f', tarball]) + config['oracle_jre_tarball'] = tarball + + # Set alternatives, so /usr/bin/java does what we want. + for tool in ['java', 'javac']: + tool_path = os.path.join(dest, 'bin', tool) + subprocess.check_call(['update-alternatives', '--install', + os.path.join('/usr/bin', tool), + tool, tool_path, '1']) + subprocess.check_call(['update-alternatives', + '--set', tool, tool_path]) + + +@action +def install_oracle_jre(): + if helpers.get_jre() != 'oracle': + return + + tarball = _fetch_oracle_jre() + _install_oracle_jre_tarball(tarball) + + +@action +def emit_java_version(): + # Log the version for posterity. Could be useful since Oracle JRE + # security updates are not automated. + version = subprocess.check_output(['java', '-version'], + universal_newlines=True) + for line in version.splitlines(): + hookenv.log('JRE: {}'.format(line)) + + +@action +def emit_meminfo(): + helpers.emit(subprocess.check_output(['free', '--human'], + universal_newlines=True)) + + +@action +def configure_cassandra_yaml(): + helpers.configure_cassandra_yaml() + + +@action +def configure_cassandra_env(): + cassandra_env_path = helpers.get_cassandra_env_file() + assert os.path.exists(cassandra_env_path) + + helpers.maybe_backup(cassandra_env_path) + + overrides = [ + ('max_heap_size', re.compile(r'^#?(MAX_HEAP_SIZE)=(.*)$', re.M)), + ('heap_newsize', re.compile(r'^#?(HEAP_NEWSIZE)=(.*)$', re.M)), + # We don't allow this to be overridden to ensure that tools + # will find JMX using the default port. + # ('jmx_port', re.compile(r'^#?(JMX_PORT)=(.*)$', re.M)), + ] + + with open(cassandra_env_path, 'r') as f: + env = f.read() + + config = hookenv.config() + for key, regexp in overrides: + if config[key]: + val = shlex.quote(str(config[key])) + env = regexp.sub(r'\g<1>={}'.format(val), + env) + else: + env = regexp.sub(r'#\1=\2', env) + host.write_file(cassandra_env_path, env.encode('UTF-8')) + + +@action +def configure_cassandra_rackdc(): + config = hookenv.config() + datacenter = config['datacenter'].strip() + rack = config['rack'].strip() or hookenv.service_name() + rackdc_properties = dedent('''\ + dc={} + rack={} + ''').format(datacenter, rack) + rackdc_path = helpers.get_cassandra_rackdc_file() + host.write_file(rackdc_path, rackdc_properties.encode('UTF-8')) + + +def needs_reset_auth_keyspace_replication(): + '''Guard for reset_auth_keyspace_replication.''' + num_nodes = helpers.num_nodes() + datacenter = hookenv.config()['datacenter'] + with helpers.connect() as session: + strategy_opts = helpers.get_auth_keyspace_replication(session) + rf = int(strategy_opts.get(datacenter, -1)) + hookenv.log('system_auth rf={!r}'.format(strategy_opts)) + # If the node count has changed, we should change the rf. + return rf != num_nodes + + +@leader_only +@action +@authentication +@coordinator.require('repair', needs_reset_auth_keyspace_replication) +def reset_auth_keyspace_replication(): + # Cassandra requires you to manually set the replication factor of + # the system_auth keyspace, to ensure availability and redundancy. + # The recommendation is to set the replication factor so that every + # node has a copy. + num_nodes = helpers.num_nodes() + datacenter = hookenv.config()['datacenter'] + with helpers.connect() as session: + strategy_opts = helpers.get_auth_keyspace_replication(session) + rf = int(strategy_opts.get(datacenter, -1)) + hookenv.log('system_auth rf={!r}'.format(strategy_opts)) + if rf != num_nodes: + strategy_opts['class'] = 'NetworkTopologyStrategy' + strategy_opts[datacenter] = num_nodes + if 'replication_factor' in strategy_opts: + del strategy_opts['replication_factor'] + helpers.set_auth_keyspace_replication(session, strategy_opts) + if rf < num_nodes: + # Increasing rf, need to run repair. + helpers.repair_auth_keyspace() + helpers.set_active() + + +@action +def store_unit_private_ip(): + '''Store the unit's private ip address, so we can tell if it changes.''' + hookenv.config()['unit_private_ip'] = hookenv.unit_private_ip() + + +def needs_restart(): + '''Return True if Cassandra is not running or needs to be restarted.''' + if helpers.is_decommissioned(): + # Decommissioned nodes are never restarted. They remain up + # telling everyone they are decommissioned. + helpers.status_set('blocked', 'Decommissioned node') + return False + + if not helpers.is_cassandra_running(): + if helpers.is_bootstrapped(): + helpers.status_set('waiting', 'Waiting for permission to start') + else: + helpers.status_set('waiting', + 'Waiting for permission to bootstrap') + return True + + config = hookenv.config() + + # If our IP address has changed, we need to restart. + if config.changed('unit_private_ip'): + helpers.status_set('waiting', 'IP address changed. ' + 'Waiting for restart permission.') + return True + + # If the directory paths have changed, we need to migrate data + # during a restart. + storage = relations.StorageRelation() + if storage.needs_remount(): + helpers.status_set(hookenv.status_get()[0], + 'New mounts. Waiting for restart permission') + return True + + # If any of these config items changed, a restart is required. + for key in RESTART_REQUIRED_KEYS: + if config.changed(key): + hookenv.log('{} changed. Restart required.'.format(key)) + for key in RESTART_REQUIRED_KEYS: + if config.changed(key): + helpers.status_set(hookenv.status_get()[0], + 'Config changes. ' + 'Waiting for restart permission.') + return True + + # If we have new seeds, we should restart. + new_seeds = helpers.get_seed_ips() + if config.get('configured_seeds') != sorted(new_seeds): + old_seeds = set(config.previous('configured_seeds') or []) + changed = old_seeds.symmetric_difference(new_seeds) + # We don't care about the local node in the changes. + changed.discard(hookenv.unit_private_ip()) + if changed: + helpers.status_set(hookenv.status_get()[0], + 'Updated seeds {!r}. ' + 'Waiting for restart permission.' + ''.format(new_seeds)) + return True + + hookenv.log('Restart not required') + return False + + +@action +@coordinator.require('restart', needs_restart) +def maybe_restart(): + '''Restart sequence. + + If a restart is needed, shutdown Cassandra, perform all pending operations + that cannot be be done while Cassandra is live, and restart it. + ''' + helpers.status_set('maintenance', 'Stopping Cassandra') + helpers.stop_cassandra() + helpers.remount_cassandra() + helpers.ensure_database_directories() + if helpers.peer_relid() and not helpers.is_bootstrapped(): + helpers.status_set('maintenance', 'Bootstrapping') + else: + helpers.status_set('maintenance', 'Starting Cassandra') + helpers.start_cassandra() + + +@action +def post_bootstrap(): + '''Maintain state on if the node has bootstrapped into the cluster. + + Per documented procedure for adding new units to a cluster, wait 2 + minutes if the unit has just bootstrapped to ensure other units + do not attempt bootstrap too soon. Also, wait until completed joining + to ensure we keep the lock and ensure other nodes don't restart or + bootstrap. + ''' + if not helpers.is_bootstrapped(): + if coordinator.relid is not None: + helpers.status_set('maintenance', 'Post-bootstrap 2 minute delay') + hookenv.log('Post-bootstrap 2 minute delay') + time.sleep(120) # Must wait 2 minutes between bootstrapping nodes. + + join_msg_set = False + while True: + status = helpers.get_node_status() + if status == 'NORMAL': + break + elif status == 'JOINING': + if not join_msg_set: + helpers.status_set('maintenance', 'Still joining cluster') + join_msg_set = True + time.sleep(10) + continue + else: + if status is None: + helpers.status_set('blocked', + 'Unexpectedly shutdown during ' + 'bootstrap') + else: + helpers.status_set('blocked', + 'Failed to bootstrap ({})' + ''.format(status)) + raise SystemExit(0) + + # Unconditionally call this to publish the bootstrapped flag to + # the peer relation, as the first unit was bootstrapped before + # the peer relation existed. + helpers.set_bootstrapped() + + +@action +def stop_cassandra(): + helpers.stop_cassandra() + + +@action +def start_cassandra(): + helpers.start_cassandra() + + +@leader_only +@action +@authentication +def create_unit_superusers(): + # The leader creates and updates accounts for nodes, using the + # encrypted password they provide in relations.PeerRelation. We + # don't end up with unencrypted passwords leaving the unit, and we + # don't need to restart Cassandra in no-auth mode which is slow and + # I worry may cause issues interrupting the bootstrap. + if not coordinator.relid: + return # No peer relation, no requests yet. + + created_units = helpers.get_unit_superusers() + uncreated_units = [u for u in hookenv.related_units(coordinator.relid) + if u not in created_units] + for peer in uncreated_units: + rel = hookenv.relation_get(unit=peer, rid=coordinator.relid) + username = rel.get('username') + pwhash = rel.get('pwhash') + if not username: + continue + hookenv.log('Creating {} account for {}'.format(username, peer)) + with helpers.connect() as session: + helpers.ensure_user(session, username, pwhash, superuser=True) + created_units.add(peer) + helpers.set_unit_superusers(created_units) + + +@action +def reset_all_io_schedulers(): + dirs = helpers.get_all_database_directories() + dirs = (dirs['data_file_directories'] + [dirs['commitlog_directory']] + + [dirs['saved_caches_directory']]) + config = hookenv.config() + for d in dirs: + if os.path.isdir(d): # Directory may not exist yet. + helpers.set_io_scheduler(config['io_scheduler'], d) + + +def _client_credentials(relid): + '''Return the client credentials used by relation relid.''' + relinfo = hookenv.relation_get(unit=hookenv.local_unit(), rid=relid) + username = relinfo.get('username') + password = relinfo.get('password') + if username is None or password is None: + for unit in hookenv.related_units(coordinator.relid): + try: + relinfo = hookenv.relation_get(unit=unit, rid=relid) + username = relinfo.get('username') + password = relinfo.get('password') + if username is not None and password is not None: + return username, password + except subprocess.CalledProcessError: + pass # Assume the remote unit has not joined yet. + return None, None + else: + return username, password + + +def _publish_database_relation(relid, superuser): + # The Casandra service needs to provide a common set of credentials + # to a client unit. The leader creates these, if none of the other + # units are found to have published them already (a previously elected + # leader may have done this). The leader then tickles the other units, + # firing a hook and giving them the opportunity to copy and publish + # these credentials. + username, password = _client_credentials(relid) + if username is None: + if hookenv.is_leader(): + # Credentials not set. The leader must generate them. We use + # the service name so that database permissions remain valid + # even after the relation is dropped and recreated, or the + # juju environment rebuild and the database restored from + # backups. + service_name = helpers.get_service_name(relid) + if not service_name: + # Per Bug #1555261, we might not yet have related units, + # so no way to calculate the remote service name and thus + # the user. + return # Try again later. + username = 'juju_{}'.format(helpers.get_service_name(relid)) + if superuser: + username += '_admin' + password = host.pwgen() + pwhash = helpers.encrypt_password(password) + with helpers.connect() as session: + helpers.ensure_user(session, username, pwhash, superuser) + # Wake the peers, if any. + helpers.leader_ping() + else: + return # No credentials yet. Nothing to do. + + # Publish the information the client needs on the relation where + # they can find it. + # - authentication credentials + # - address and port + # - cluster_name, so clients can differentiate multiple clusters + # - datacenter + rack, so clients know what names they can use + # when altering keyspace replication settings. + config = hookenv.config() + hookenv.relation_set(relid, + username=username, password=password, + host=hookenv.unit_public_ip(), + native_transport_port=config['native_transport_port'], + rpc_port=config['rpc_port'], + cluster_name=config['cluster_name'], + datacenter=config['datacenter'], + rack=config['rack']) + + +@action +def publish_database_relations(): + for relid in hookenv.relation_ids('database'): + _publish_database_relation(relid, superuser=False) + + +@action +def publish_database_admin_relations(): + for relid in hookenv.relation_ids('database-admin'): + _publish_database_relation(relid, superuser=True) + + +@action +def install_maintenance_crontab(): + # Every unit should run repair once per week (at least once per + # GCGraceSeconds, which defaults to 10 days but can be changed per + # keyspace). # Distribute the repair time evenly over the week. + unit_num = int(hookenv.local_unit().split('/')[-1]) + dow, hour, minute = helpers.week_spread(unit_num) + contents = jinja.render('cassandra_maintenance_cron.tmpl', vars()) + cron_path = "/etc/cron.d/cassandra-maintenance" + host.write_file(cron_path, contents.encode('US-ASCII')) + + +@action +def emit_cluster_info(): + helpers.emit_describe_cluster() + helpers.emit_status() + helpers.emit_netstats() + + +@action +def configure_firewall(): + '''Configure firewall rules using ufw. + + This is primarily to block access to the replication and JMX ports, + as juju's default port access controls are not strict enough and + allow access to the entire environment. + ''' + config = hookenv.config() + ufw.enable(soft_fail=True) + + # Enable SSH from anywhere, relying on Juju and external firewalls + # to control access. + ufw.service('ssh', 'open') + ufw.service('nrpe', 'open') # Also NRPE for nagios checks. + ufw.service('rsync', 'open') # Also rsync for data transfer and backups. + + # Clients need client access. These protocols are configured to + # require authentication. + client_keys = ['native_transport_port', 'rpc_port'] + client_ports = [config[key] for key in client_keys] + + # Peers need replication access. This protocols does not + # require authentication, so firewall it from other nodes. + peer_ports = [config['storage_port'], config['ssl_storage_port']] + + # Enable client access from anywhere. Juju and external firewalls + # can still restrict this further of course (ie. 'juju expose'). + for key in client_keys: + if config.changed(key) and config.previous(key) is not None: + # First close old ports. We use this order in the unlikely case + # someone is trying to swap the native and Thrift ports. + ufw.service(config.previous(key), 'close') + for port in client_ports: + # Then open or close the configured ports. + ufw.service(port, 'open') + + desired_rules = set() # ufw.grant_access/remove_access commands. + + # Rules for peers + for relinfo in hookenv.relations_of_type('cluster'): + if relinfo['private-address']: + pa = hookenv._ensure_ip(relinfo['private-address']) + for port in peer_ports: + desired_rules.add((pa, 'any', port)) + # Rules for admin connections. We allow database-admin relations access + # to the cluster communication ports so that tools like sstableloader + # can run. + for relinfo in hookenv.relations_of_type('database-admin'): + if relinfo['private-address']: + pa = hookenv._ensure_ip(relinfo['private-address']) + for port in peer_ports: + desired_rules.add((pa, 'any', port)) + + previous_rules = set(tuple(rule) for rule in config.get('ufw_rules', [])) + + # Close any rules previously opened that are no longer desired. + for rule in sorted(list(previous_rules - desired_rules)): + ufw.revoke_access(*rule) + + # Open all the desired rules. + for rule in sorted(list(desired_rules)): + ufw.grant_access(*rule) + + # Store our rules for next time. Note that this is inherantly racy - + # this value is only persisted if the hook exits cleanly. If the + # hook fails, then someone changes port configuration or IP + # addresses change, then the failed hook retried, we can lose track + # of previously granted rules and they will never be revoked. It is + # impossible to remove this race entirely, so we stick with this + # simple approach. + config['ufw_rules'] = list(desired_rules) # A list because JSON. + + +@action +def nrpe_external_master_relation(): + ''' Configure the nrpe-external-master relation ''' + local_plugins = helpers.local_plugins_dir() + if os.path.exists(local_plugins): + src = os.path.join(hookenv.charm_dir(), + "files", "check_cassandra_heap.sh") + with open(src, 'rb') as f: + host.write_file(os.path.join(local_plugins, + 'check_cassandra_heap.sh'), + f.read(), perms=0o555) + + nrpe_compat = nrpe.NRPE() + conf = hookenv.config() + + cassandra_heap_warn = conf.get('nagios_heapchk_warn_pct') + cassandra_heap_crit = conf.get('nagios_heapchk_crit_pct') + if cassandra_heap_warn and cassandra_heap_crit: + nrpe_compat.add_check( + shortname="cassandra_heap", + description="Check Cassandra Heap", + check_cmd="check_cassandra_heap.sh localhost {} {}" + "".format(cassandra_heap_warn, cassandra_heap_crit)) + + cassandra_disk_warn = conf.get('nagios_disk_warn_pct') + cassandra_disk_crit = conf.get('nagios_disk_crit_pct') + dirs = helpers.get_all_database_directories() + dirs = set(dirs['data_file_directories'] + + [dirs['commitlog_directory'], dirs['saved_caches_directory']]) + # We need to check the space on the mountpoint, not on the actual + # directory, as the nagios user won't have access to the actual directory. + mounts = set(helpers.mountpoint(d) for d in dirs) + for disk in mounts: + check_name = re.sub('[^A-Za-z0-9_]', '_', disk) + if cassandra_disk_warn and cassandra_disk_crit: + shortname = "cassandra_disk{}".format(check_name) + hookenv.log("Adding disk utilization check {}".format(shortname), + DEBUG) + nrpe_compat.add_check( + shortname=shortname, + description="Check Cassandra Disk {}".format(disk), + check_cmd="check_disk -u GB -w {}% -c {}% -K 5% -p {}" + "".format(cassandra_disk_warn, cassandra_disk_crit, + disk)) + nrpe_compat.write() + + +@leader_only +@action +def maintain_seeds(): + '''The leader needs to maintain the list of seed nodes''' + seed_ips = helpers.get_seed_ips() + hookenv.log('Current seeds == {!r}'.format(seed_ips), DEBUG) + + bootstrapped_ips = helpers.get_bootstrapped_ips() + hookenv.log('Bootstrapped == {!r}'.format(bootstrapped_ips), DEBUG) + + # Remove any seeds that are no longer bootstrapped, such as dropped + # units. + seed_ips.intersection_update(bootstrapped_ips) + + # Add more bootstrapped nodes, if necessary, to get to our maximum + # of 3 seeds. + potential_seed_ips = list(reversed(sorted(bootstrapped_ips))) + while len(seed_ips) < 3 and potential_seed_ips: + seed_ips.add(potential_seed_ips.pop()) + + # If there are no seeds or bootstrapped nodes, start with the leader. Us. + if len(seed_ips) == 0: + seed_ips.add(hookenv.unit_private_ip()) + + hookenv.log('Updated seeds == {!r}'.format(seed_ips), DEBUG) + + hookenv.leader_set(seeds=','.join(sorted(seed_ips))) + + +@leader_only +@action +@authentication +def reset_default_password(): + if hookenv.leader_get('default_admin_password_changed'): + hookenv.log('Default admin password already changed') + return + + # Cassandra ships with well known credentials, rather than + # providing a tool to reset credentials. This is a huge security + # hole we must close. + try: + # We need a big timeout here, as the cassandra user actually + # springs into existence some time after Cassandra has started + # up and is accepting connections. + with helpers.connect('cassandra', 'cassandra', + timeout=120, auth_timeout=120) as session: + # But before we close this security hole, we need to use these + # credentials to create a different admin account for the + # leader, allowing it to create accounts for other nodes as they + # join. The alternative is restarting Cassandra without + # authentication, which this charm will likely need to do in the + # future when we allow Cassandra services to be related together. + helpers.status_set('maintenance', + 'Creating initial superuser account') + username, password = helpers.superuser_credentials() + pwhash = helpers.encrypt_password(password) + helpers.ensure_user(session, username, pwhash, superuser=True) + helpers.set_unit_superusers([hookenv.local_unit()]) + + helpers.status_set('maintenance', + 'Changing default admin password') + helpers.query(session, 'ALTER USER cassandra WITH PASSWORD %s', + cassandra.ConsistencyLevel.ALL, (host.pwgen(),)) + except cassandra.AuthenticationFailed: + hookenv.log('Default superuser account already reset') + try: + with helpers.connect(): + hookenv.log("Leader's superuser account already created") + except cassandra.AuthenticationFailed: + # We have no known superuser credentials. Create the account + # the hard, slow way. This will be the normal method + # of creating the service's initial account when we allow + # services to be related together. + helpers.create_unit_superuser_hard() + + hookenv.leader_set(default_admin_password_changed=True) + + +@action +def set_active(): + # If we got this far, the unit is active. Update the status if it is + # not already active. We don't do this unconditionally, as the charm + # may be active but doing stuff, like active but waiting for restart + # permission. + if hookenv.status_get()[0] != 'active': + helpers.set_active() + else: + hookenv.log('Unit status already active', DEBUG) + + +@action +@authentication +def request_unit_superuser(): + relid = helpers.peer_relid() + if relid is None: + hookenv.log('Request deferred until peer relation exists') + return + + relinfo = hookenv.relation_get(unit=hookenv.local_unit(), + rid=relid) + if relinfo and relinfo.get('username'): + # We must avoid blindly setting the pwhash on the relation, + # as we will likely get a different value everytime we + # encrypt the password due to the random salt. + hookenv.log('Superuser account request previously made') + else: + # Publish the requested superuser and hash to our peers. + username, password = helpers.superuser_credentials() + pwhash = helpers.encrypt_password(password) + hookenv.relation_set(relid, username=username, pwhash=pwhash) + hookenv.log('Requested superuser account creation') + + +@action +def update_etc_hosts(): + hostname = socket.gethostname() + addr = hookenv.unit_private_ip() + hosts_map = {addr: hostname} + # only need to add myself to /etc/hosts + helpers.update_hosts_file('/etc/hosts', hosts_map) -- cgit 1.2.3-korg