# Copyright 2014-2015 Canonical Limited. # # This file is part of charm-helpers. # # charm-helpers is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License version 3 as # published by the Free Software Foundation. # # charm-helpers is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with charm-helpers. If not, see . "Interactions with the Juju environment" # Copyright 2013 Canonical Ltd. # # Authors: # Charm Helpers Developers from __future__ import print_function from functools import wraps import os import json import yaml import subprocess import sys import errno import tempfile from subprocess import CalledProcessError import six if not six.PY3: from UserDict import UserDict else: from collections import UserDict CRITICAL = "CRITICAL" ERROR = "ERROR" WARNING = "WARNING" INFO = "INFO" DEBUG = "DEBUG" MARKER = object() cache = {} def cached(func): """Cache return values for multiple executions of func + args For example:: @cached def unit_get(attribute): pass unit_get('test') will cache the result of unit_get + 'test' for future calls. """ @wraps(func) def wrapper(*args, **kwargs): global cache key = str((func, args, kwargs)) try: return cache[key] except KeyError: pass # Drop out of the exception handler scope. res = func(*args, **kwargs) cache[key] = res return res return wrapper def flush(key): """Flushes any entries from function cache where the key is found in the function+args """ flush_list = [] for item in cache: if key in item: flush_list.append(item) for item in flush_list: del cache[item] def log(message, level=None): """Write a message to the juju log""" command = ['juju-log'] if level: command += ['-l', level] if not isinstance(message, six.string_types): message = repr(message) command += [message] # Missing juju-log should not cause failures in unit tests # Send log output to stderr try: subprocess.call(command) except OSError as e: if e.errno == errno.ENOENT: if level: message = "{}: {}".format(level, message) message = "juju-log: {}".format(message) print(message, file=sys.stderr) else: raise class Serializable(UserDict): """Wrapper, an object that can be serialized to yaml or json""" def __init__(self, obj): # wrap the object UserDict.__init__(self) self.data = obj def __getattr__(self, attr): # See if this object has attribute. if attr in ("json", "yaml", "data"): return self.__dict__[attr] # Check for attribute in wrapped object. got = getattr(self.data, attr, MARKER) if got is not MARKER: return got # Proxy to the wrapped object via dict interface. try: return self.data[attr] except KeyError: raise AttributeError(attr) def __getstate__(self): # Pickle as a standard dictionary. return self.data def __setstate__(self, state): # Unpickle into our wrapper. self.data = state def json(self): """Serialize the object to json""" return json.dumps(self.data) def yaml(self): """Serialize the object to yaml""" return yaml.dump(self.data) def execution_environment(): """A convenient bundling of the current execution context""" context = {} context['conf'] = config() if relation_id(): context['reltype'] = relation_type() context['relid'] = relation_id() context['rel'] = relation_get() context['unit'] = local_unit() context['rels'] = relations() context['env'] = os.environ return context def in_relation_hook(): """Determine whether we're running in a relation hook""" return 'JUJU_RELATION' in os.environ def relation_type(): """The scope for the current relation hook""" return os.environ.get('JUJU_RELATION', None) def relation_id(): """The relation ID for the current relation hook""" return os.environ.get('JUJU_RELATION_ID', None) def local_unit(): """Local unit ID""" return os.environ['JUJU_UNIT_NAME'] def remote_unit(): """The remote unit for the current relation hook""" return os.environ.get('JUJU_REMOTE_UNIT', None) def service_name(): """The name service group this unit belongs to""" return local_unit().split('/')[0] def hook_name(): """The name of the currently executing hook""" return os.path.basename(sys.argv[0]) class Config(dict): """A dictionary representation of the charm's config.yaml, with some extra features: - See which values in the dictionary have changed since the previous hook. - For values that have changed, see what the previous value was. - Store arbitrary data for use in a later hook. NOTE: Do not instantiate this object directly - instead call ``hookenv.config()``, which will return an instance of :class:`Config`. Example usage:: >>> # inside a hook >>> from charmhelpers.core import hookenv >>> config = hookenv.config() >>> config['foo'] 'bar' >>> # store a new key/value for later use >>> config['mykey'] = 'myval' >>> # user runs `juju set mycharm foo=baz` >>> # now we're inside subsequent config-changed hook >>> config = hookenv.config() >>> config['foo'] 'baz' >>> # test to see if this val has changed since last hook >>> config.changed('foo') True >>> # what was the previous value? >>> config.previous('foo') 'bar' >>> # keys/values that we add are preserved across hooks >>> config['mykey'] 'myval' """ CONFIG_FILE_NAME = '.juju-persistent-config' def __init__(self, *args, **kw): super(Config, self).__init__(*args, **kw) self.implicit_save = True self._prev_dict = None self.path = os.path.join(charm_dir(), Config.CONFIG_FILE_NAME) if os.path.exists(self.path): self.load_previous() def __getitem__(self, key): """For regular dict lookups, check the current juju config first, then the previous (saved) copy. This ensures that user-saved values will be returned by a dict lookup. """ try: return dict.__getitem__(self, key) except KeyError: return (self._prev_dict or {})[key] def get(self, key, default=None): try: return self[key] except KeyError: return default def keys(self): prev_keys = [] if self._prev_dict is not None: prev_keys = self._prev_dict.keys() return list(set(prev_keys + list(dict.keys(self)))) def load_previous(self, path=None): """Load previous copy of config from disk. In normal usage you don't need to call this method directly - it is called automatically at object initialization. :param path: File path from which to load the previous config. If `None`, config is loaded from the default location. If `path` is specified, subsequent `save()` calls will write to the same path. """ self.path = path or self.path with open(self.path) as f: self._prev_dict = json.load(f) def changed(self, key): """Return True if the current value for this key is different from the previous value. """ if self._prev_dict is None: return True return self.previous(key) != self.get(key) def previous(self, key): """Return previous value for this key, or None if there is no previous value. """ if self._prev_dict: return self._prev_dict.get(key) return None def save(self): """Save this config to disk. If the charm is using the :mod:`Services Framework ` or :meth:'@hook ' decorator, this is called automatically at the end of successful hook execution. Otherwise, it should be called directly by user code. To disable automatic saves, set ``implicit_save=False`` on this instance. """ if self._prev_dict: for k, v in six.iteritems(self._prev_dict): if k not in self: self[k] = v with open(self.path, 'w') as f: json.dump(self, f) @cached def config(scope=None): """Juju charm configuration""" config_cmd_line = ['config-get'] if scope is not None: config_cmd_line.append(scope) config_cmd_line.append('--format=json') try: config_data = json.loads( subprocess.check_output(config_cmd_line).decode('UTF-8')) if scope is not None: return config_data return Config(config_data) except ValueError: return None @cached def relation_get(attribute=None, unit=None, rid=None): """Get relation information""" _args = ['relation-get', '--format=json'] if rid: _args.append('-r') _args.append(rid) _args.append(attribute or '-') if unit: _args.append(unit) try: return json.loads(subprocess.check_output(_args).decode('UTF-8')) except ValueError: return None except CalledProcessError as e: if e.returncode == 2: return None raise def relation_set(relation_id=None, relation_settings=None, **kwargs): """Set relation information for the current unit""" relation_settings = relation_settings if relation_settings else {} relation_cmd_line = ['relation-set'] accepts_file = "--file" in subprocess.check_output( relation_cmd_line + ["--help"], universal_newlines=True) if relation_id is not None: relation_cmd_line.extend(('-r', relation_id)) settings = relation_settings.copy() settings.update(kwargs) for key, value in settings.items(): # Force value to be a string: it always should, but some call # sites pass in things like dicts or numbers. if value is not None: settings[key] = "{}".format(value) if accepts_file: # --file was introduced in Juju 1.23.2. Use it by default if # available, since otherwise we'll break if the relation data is # too big. Ideally we should tell relation-set to read the data from # stdin, but that feature is broken in 1.23.2: Bug #1454678. with tempfile.NamedTemporaryFile(delete=False) as settings_file: settings_file.write(yaml.safe_dump(settings).encode("utf-8")) subprocess.check_call( relation_cmd_line + ["--file", settings_file.name]) os.remove(settings_file.name) else: for key, value in settings.items(): if value is None: relation_cmd_line.append('{}='.format(key)) else: relation_cmd_line.append('{}={}'.format(key, value)) subprocess.check_call(relation_cmd_line) # Flush cache of any relation-gets for local unit flush(local_unit()) def relation_clear(r_id=None): ''' Clears any relation data already set on relation r_id ''' settings = relation_get(rid=r_id, unit=local_unit()) for setting in settings: if setting not in ['public-address', 'private-address']: settings[setting] = None relation_set(relation_id=r_id, **settings) @cached def relation_ids(reltype=None): """A list of relation_ids""" reltype = reltype or relation_type() relid_cmd_line = ['relation-ids', '--format=json'] if reltype is not None: relid_cmd_line.append(reltype) return json.loads( subprocess.check_output(relid_cmd_line).decode('UTF-8')) or [] return [] @cached def related_units(relid=None): """A list of related units""" relid = relid or relation_id() units_cmd_line = ['relation-list', '--format=json'] if relid is not None: units_cmd_line.extend(('-r', relid)) return json.loads( subprocess.check_output(units_cmd_line).decode('UTF-8')) or [] @cached def relation_for_unit(unit=None, rid=None): """Get the json represenation of a unit's relation""" unit = unit or remote_unit() relation = relation_get(unit=unit, rid=rid) for key in relation: if key.endswith('-list'): relation[key] = relation[key].split() relation['__unit__'] = unit return relation @cached def relations_for_id(relid=None): """Get relations of a specific relation ID""" relation_data = [] relid = relid or relation_ids() for unit in related_units(relid): unit_data = relation_for_unit(unit, relid) unit_data['__relid__'] = relid relation_data.append(unit_data) return relation_data @cached def relations_of_type(reltype=None): """Get relations of a specific type""" relation_data = [] reltype = reltype or relation_type() for relid in relation_ids(reltype): for relation in relations_for_id(relid): relation['__relid__'] = relid relation_data.append(relation) return relation_data @cached def metadata(): """Get the current charm metadata.yaml contents as a python object""" with open(os.path.join(charm_dir(), 'metadata.yaml')) as md: return yaml.safe_load(md) @cached def relation_types(): """Get a list of relation types supported by this charm""" rel_types = [] md = metadata() for key in ('provides', 'requires', 'peers'): section = md.get(key) if section: rel_types.extend(section.keys()) return rel_types @cached def charm_name(): """Get the name of the current charm as is specified on metadata.yaml""" return metadata().get('name') @cached def relations(): """Get a nested dictionary of relation data for all related units""" rels = {} for reltype in relation_types(): relids = {} for relid in relation_ids(reltype): units = {local_unit(): relation_get(unit=local_unit(), rid=relid)} for unit in related_units(relid): reldata = relation_get(unit=unit, rid=relid) units[unit] = reldata relids[relid] = units rels[reltype] = relids return rels @cached def is_relation_made(relation, keys='private-address'): ''' Determine whether a relation is established by checking for presence of key(s). If a list of keys is provided, they must all be present for the relation to be identified as made ''' if isinstance(keys, str): keys = [keys] for r_id in relation_ids(relation): for unit in related_units(r_id): context = {} for k in keys: context[k] = relation_get(k, rid=r_id, unit=unit) if None not in context.values(): return True return False def open_port(port, protocol="TCP"): """Open a service network port""" _args = ['open-port'] _args.append('{}/{}'.format(port, protocol)) subprocess.check_call(_args) def close_port(port, protocol="TCP"): """Close a service network port""" _args = ['close-port'] _args.append('{}/{}'.format(port, protocol)) subprocess.check_call(_args) @cached def unit_get(attribute): """Get the unit ID for the remote unit""" _args = ['unit-get', '--format=json', attribute] try: return json.loads(subprocess.check_output(_args).decode('UTF-8')) except ValueError: return None def unit_public_ip(): """Get this unit's public IP address""" return unit_get('public-address') def unit_private_ip(): """Get this unit's private IP address""" return unit_get('private-address') class UnregisteredHookError(Exception): """Raised when an undefined hook is called""" pass class Hooks(object): """A convenient handler for hook functions. Example:: hooks = Hooks() # register a hook, taking its name from the function name @hooks.hook() def install(): pass # your code here # register a hook, providing a custom hook name @hooks.hook("config-changed") def config_changed(): pass # your code here if __name__ == "__main__": # execute a hook based on the name the program is called by hooks.execute(sys.argv) """ def __init__(self, config_save=True): super(Hooks, self).__init__() self._hooks = {} self._config_save = config_save def register(self, name, function): """Register a hook""" self._hooks[name] = function def execute(self, args): """Execute a registered hook based on args[0]""" hook_name = os.path.basename(args[0]) if hook_name in self._hooks: self._hooks[hook_name]() if self._config_save: cfg = config() if cfg.implicit_save: cfg.save() else: raise UnregisteredHookError(hook_name) def hook(self, *hook_names): """Decorator, registering them as hooks""" def wrapper(decorated): for hook_name in hook_names: self.register(hook_name, decorated) else: self.register(decorated.__name__, decorated) if '_' in decorated.__name__: self.register( decorated.__name__.replace('_', '-'), decorated) return decorated return wrapper def charm_dir(): """Return the root directory of the current charm""" return os.environ.get('CHARM_DIR') @cached def action_get(key=None): """Gets the value of an action parameter, or all key/value param pairs""" cmd = ['action-get'] if key is not None: cmd.append(key) cmd.append('--format=json') action_data = json.loads(subprocess.check_output(cmd).decode('UTF-8')) return action_data def action_set(values): """Sets the values to be returned after the action finishes""" cmd = ['action-set'] for k, v in list(values.items()): cmd.append('{}={}'.format(k, v)) subprocess.check_call(cmd) def action_fail(message): """Sets the action status to failed and sets the error message. The results set by action_set are preserved.""" subprocess.check_call(['action-fail', message]) def status_set(workload_state, message): """Set the workload state with a message Use status-set to set the workload state with a message which is visible to the user via juju status. If the status-set command is not found then assume this is juju < 1.23 and juju-log the message unstead. workload_state -- valid juju workload state. message -- status update message """ valid_states = ['maintenance', 'blocked', 'waiting', 'active'] if workload_state not in valid_states: raise ValueError( '{!r} is not a valid workload state'.format(workload_state) ) cmd = ['status-set', workload_state, message] try: ret = subprocess.call(cmd) if ret == 0: return except OSError as e: if e.errno != errno.ENOENT: raise log_message = 'status-set failed: {} {}'.format(workload_state, message) log(log_message, level='INFO') def status_get(): """Retrieve the previously set juju workload state If the status-set command is not found then assume this is juju < 1.23 and return 'unknown' """ cmd = ['status-get'] try: raw_status = subprocess.check_output(cmd, universal_newlines=True) status = raw_status.rstrip() return status except OSError as e: if e.errno == errno.ENOENT: return 'unknown' else: raise def translate_exc(from_exc, to_exc): def inner_translate_exc1(f): def inner_translate_exc2(*args, **kwargs): try: return f(*args, **kwargs) except from_exc: raise to_exc return inner_translate_exc2 return inner_translate_exc1 @translate_exc(from_exc=OSError, to_exc=NotImplementedError) def is_leader(): """Does the current unit hold the juju leadership Uses juju to determine whether the current unit is the leader of its peers """ cmd = ['is-leader', '--format=json'] return json.loads(subprocess.check_output(cmd).decode('UTF-8')) @translate_exc(from_exc=OSError, to_exc=NotImplementedError) def leader_get(attribute=None): """Juju leader get value(s)""" cmd = ['leader-get', '--format=json'] + [attribute or '-'] return json.loads(subprocess.check_output(cmd).decode('UTF-8')) @translate_exc(from_exc=OSError, to_exc=NotImplementedError) def leader_set(settings=None, **kwargs): """Juju leader set value(s)""" log("Juju leader-set '%s'" % (settings), level=DEBUG) cmd = ['leader-set'] settings = settings or {} settings.update(kwargs) for k, v in settings.iteritems(): if v is None: cmd.append('{}='.format(k)) else: cmd.append('{}={}'.format(k, v)) subprocess.check_call(cmd)