aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/contrail-control/hooks/charmhelpers/contrib/network')
-rw-r--r--charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/__init__.py15
-rw-r--r--charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ip.py456
-rw-r--r--charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ovs/__init__.py96
-rw-r--r--charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ufw.py318
4 files changed, 0 insertions, 885 deletions
diff --git a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/__init__.py b/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/__init__.py
deleted file mode 100644
index d1400a0..0000000
--- a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright 2014-2015 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
diff --git a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ip.py b/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ip.py
deleted file mode 100644
index 7f3b66b..0000000
--- a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ip.py
+++ /dev/null
@@ -1,456 +0,0 @@
-# Copyright 2014-2015 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-import glob
-import re
-import subprocess
-import six
-import socket
-
-from functools import partial
-
-from charmhelpers.core.hookenv import unit_get
-from charmhelpers.fetch import apt_install, apt_update
-from charmhelpers.core.hookenv import (
- log,
- WARNING,
-)
-
-try:
- import netifaces
-except ImportError:
- apt_update(fatal=True)
- apt_install('python-netifaces', fatal=True)
- import netifaces
-
-try:
- import netaddr
-except ImportError:
- apt_update(fatal=True)
- apt_install('python-netaddr', fatal=True)
- import netaddr
-
-
-def _validate_cidr(network):
- try:
- netaddr.IPNetwork(network)
- except (netaddr.core.AddrFormatError, ValueError):
- raise ValueError("Network (%s) is not in CIDR presentation format" %
- network)
-
-
-def no_ip_found_error_out(network):
- errmsg = ("No IP address found in network: %s" % network)
- raise ValueError(errmsg)
-
-
-def get_address_in_network(network, fallback=None, fatal=False):
- """Get an IPv4 or IPv6 address within the network from the host.
-
- :param network (str): CIDR presentation format. For example,
- '192.168.1.0/24'.
- :param fallback (str): If no address is found, return fallback.
- :param fatal (boolean): If no address is found, fallback is not
- set and fatal is True then exit(1).
- """
- if network is None:
- if fallback is not None:
- return fallback
-
- if fatal:
- no_ip_found_error_out(network)
- else:
- return None
-
- _validate_cidr(network)
- network = netaddr.IPNetwork(network)
- for iface in netifaces.interfaces():
- addresses = netifaces.ifaddresses(iface)
- if network.version == 4 and netifaces.AF_INET in addresses:
- addr = addresses[netifaces.AF_INET][0]['addr']
- netmask = addresses[netifaces.AF_INET][0]['netmask']
- cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
- if cidr in network:
- return str(cidr.ip)
-
- if network.version == 6 and netifaces.AF_INET6 in addresses:
- for addr in addresses[netifaces.AF_INET6]:
- if not addr['addr'].startswith('fe80'):
- cidr = netaddr.IPNetwork("%s/%s" % (addr['addr'],
- addr['netmask']))
- if cidr in network:
- return str(cidr.ip)
-
- if fallback is not None:
- return fallback
-
- if fatal:
- no_ip_found_error_out(network)
-
- return None
-
-
-def is_ipv6(address):
- """Determine whether provided address is IPv6 or not."""
- try:
- address = netaddr.IPAddress(address)
- except netaddr.AddrFormatError:
- # probably a hostname - so not an address at all!
- return False
-
- return address.version == 6
-
-
-def is_address_in_network(network, address):
- """
- Determine whether the provided address is within a network range.
-
- :param network (str): CIDR presentation format. For example,
- '192.168.1.0/24'.
- :param address: An individual IPv4 or IPv6 address without a net
- mask or subnet prefix. For example, '192.168.1.1'.
- :returns boolean: Flag indicating whether address is in network.
- """
- try:
- network = netaddr.IPNetwork(network)
- except (netaddr.core.AddrFormatError, ValueError):
- raise ValueError("Network (%s) is not in CIDR presentation format" %
- network)
-
- try:
- address = netaddr.IPAddress(address)
- except (netaddr.core.AddrFormatError, ValueError):
- raise ValueError("Address (%s) is not in correct presentation format" %
- address)
-
- if address in network:
- return True
- else:
- return False
-
-
-def _get_for_address(address, key):
- """Retrieve an attribute of or the physical interface that
- the IP address provided could be bound to.
-
- :param address (str): An individual IPv4 or IPv6 address without a net
- mask or subnet prefix. For example, '192.168.1.1'.
- :param key: 'iface' for the physical interface name or an attribute
- of the configured interface, for example 'netmask'.
- :returns str: Requested attribute or None if address is not bindable.
- """
- address = netaddr.IPAddress(address)
- for iface in netifaces.interfaces():
- addresses = netifaces.ifaddresses(iface)
- if address.version == 4 and netifaces.AF_INET in addresses:
- addr = addresses[netifaces.AF_INET][0]['addr']
- netmask = addresses[netifaces.AF_INET][0]['netmask']
- network = netaddr.IPNetwork("%s/%s" % (addr, netmask))
- cidr = network.cidr
- if address in cidr:
- if key == 'iface':
- return iface
- else:
- return addresses[netifaces.AF_INET][0][key]
-
- if address.version == 6 and netifaces.AF_INET6 in addresses:
- for addr in addresses[netifaces.AF_INET6]:
- if not addr['addr'].startswith('fe80'):
- network = netaddr.IPNetwork("%s/%s" % (addr['addr'],
- addr['netmask']))
- cidr = network.cidr
- if address in cidr:
- if key == 'iface':
- return iface
- elif key == 'netmask' and cidr:
- return str(cidr).split('/')[1]
- else:
- return addr[key]
-
- return None
-
-
-get_iface_for_address = partial(_get_for_address, key='iface')
-
-
-get_netmask_for_address = partial(_get_for_address, key='netmask')
-
-
-def format_ipv6_addr(address):
- """If address is IPv6, wrap it in '[]' otherwise return None.
-
- This is required by most configuration files when specifying IPv6
- addresses.
- """
- if is_ipv6(address):
- return "[%s]" % address
-
- return None
-
-
-def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
- fatal=True, exc_list=None):
- """Return the assigned IP address for a given interface, if any."""
- # Extract nic if passed /dev/ethX
- if '/' in iface:
- iface = iface.split('/')[-1]
-
- if not exc_list:
- exc_list = []
-
- try:
- inet_num = getattr(netifaces, inet_type)
- except AttributeError:
- raise Exception("Unknown inet type '%s'" % str(inet_type))
-
- interfaces = netifaces.interfaces()
- if inc_aliases:
- ifaces = []
- for _iface in interfaces:
- if iface == _iface or _iface.split(':')[0] == iface:
- ifaces.append(_iface)
-
- if fatal and not ifaces:
- raise Exception("Invalid interface '%s'" % iface)
-
- ifaces.sort()
- else:
- if iface not in interfaces:
- if fatal:
- raise Exception("Interface '%s' not found " % (iface))
- else:
- return []
-
- else:
- ifaces = [iface]
-
- addresses = []
- for netiface in ifaces:
- net_info = netifaces.ifaddresses(netiface)
- if inet_num in net_info:
- for entry in net_info[inet_num]:
- if 'addr' in entry and entry['addr'] not in exc_list:
- addresses.append(entry['addr'])
-
- if fatal and not addresses:
- raise Exception("Interface '%s' doesn't have any %s addresses." %
- (iface, inet_type))
-
- return sorted(addresses)
-
-
-get_ipv4_addr = partial(get_iface_addr, inet_type='AF_INET')
-
-
-def get_iface_from_addr(addr):
- """Work out on which interface the provided address is configured."""
- for iface in netifaces.interfaces():
- addresses = netifaces.ifaddresses(iface)
- for inet_type in addresses:
- for _addr in addresses[inet_type]:
- _addr = _addr['addr']
- # link local
- ll_key = re.compile("(.+)%.*")
- raw = re.match(ll_key, _addr)
- if raw:
- _addr = raw.group(1)
-
- if _addr == addr:
- log("Address '%s' is configured on iface '%s'" %
- (addr, iface))
- return iface
-
- msg = "Unable to infer net iface on which '%s' is configured" % (addr)
- raise Exception(msg)
-
-
-def sniff_iface(f):
- """Ensure decorated function is called with a value for iface.
-
- If no iface provided, inject net iface inferred from unit private address.
- """
- def iface_sniffer(*args, **kwargs):
- if not kwargs.get('iface', None):
- kwargs['iface'] = get_iface_from_addr(unit_get('private-address'))
-
- return f(*args, **kwargs)
-
- return iface_sniffer
-
-
-@sniff_iface
-def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None,
- dynamic_only=True):
- """Get assigned IPv6 address for a given interface.
-
- Returns list of addresses found. If no address found, returns empty list.
-
- If iface is None, we infer the current primary interface by doing a reverse
- lookup on the unit private-address.
-
- We currently only support scope global IPv6 addresses i.e. non-temporary
- addresses. If no global IPv6 address is found, return the first one found
- in the ipv6 address list.
- """
- addresses = get_iface_addr(iface=iface, inet_type='AF_INET6',
- inc_aliases=inc_aliases, fatal=fatal,
- exc_list=exc_list)
-
- if addresses:
- global_addrs = []
- for addr in addresses:
- key_scope_link_local = re.compile("^fe80::..(.+)%(.+)")
- m = re.match(key_scope_link_local, addr)
- if m:
- eui_64_mac = m.group(1)
- iface = m.group(2)
- else:
- global_addrs.append(addr)
-
- if global_addrs:
- # Make sure any found global addresses are not temporary
- cmd = ['ip', 'addr', 'show', iface]
- out = subprocess.check_output(cmd).decode('UTF-8')
- if dynamic_only:
- key = re.compile("inet6 (.+)/[0-9]+ scope global dynamic.*")
- else:
- key = re.compile("inet6 (.+)/[0-9]+ scope global.*")
-
- addrs = []
- for line in out.split('\n'):
- line = line.strip()
- m = re.match(key, line)
- if m and 'temporary' not in line:
- # Return the first valid address we find
- for addr in global_addrs:
- if m.group(1) == addr:
- if not dynamic_only or \
- m.group(1).endswith(eui_64_mac):
- addrs.append(addr)
-
- if addrs:
- return addrs
-
- if fatal:
- raise Exception("Interface '%s' does not have a scope global "
- "non-temporary ipv6 address." % iface)
-
- return []
-
-
-def get_bridges(vnic_dir='/sys/devices/virtual/net'):
- """Return a list of bridges on the system."""
- b_regex = "%s/*/bridge" % vnic_dir
- return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)]
-
-
-def get_bridge_nics(bridge, vnic_dir='/sys/devices/virtual/net'):
- """Return a list of nics comprising a given bridge on the system."""
- brif_regex = "%s/%s/brif/*" % (vnic_dir, bridge)
- return [x.split('/')[-1] for x in glob.glob(brif_regex)]
-
-
-def is_bridge_member(nic):
- """Check if a given nic is a member of a bridge."""
- for bridge in get_bridges():
- if nic in get_bridge_nics(bridge):
- return True
-
- return False
-
-
-def is_ip(address):
- """
- Returns True if address is a valid IP address.
- """
- try:
- # Test to see if already an IPv4 address
- socket.inet_aton(address)
- return True
- except socket.error:
- return False
-
-
-def ns_query(address):
- try:
- import dns.resolver
- except ImportError:
- apt_install('python-dnspython')
- import dns.resolver
-
- if isinstance(address, dns.name.Name):
- rtype = 'PTR'
- elif isinstance(address, six.string_types):
- rtype = 'A'
- else:
- return None
-
- answers = dns.resolver.query(address, rtype)
- if answers:
- return str(answers[0])
- return None
-
-
-def get_host_ip(hostname, fallback=None):
- """
- Resolves the IP for a given hostname, or returns
- the input if it is already an IP.
- """
- if is_ip(hostname):
- return hostname
-
- ip_addr = ns_query(hostname)
- if not ip_addr:
- try:
- ip_addr = socket.gethostbyname(hostname)
- except:
- log("Failed to resolve hostname '%s'" % (hostname),
- level=WARNING)
- return fallback
- return ip_addr
-
-
-def get_hostname(address, fqdn=True):
- """
- Resolves hostname for given IP, or returns the input
- if it is already a hostname.
- """
- if is_ip(address):
- try:
- import dns.reversename
- except ImportError:
- apt_install("python-dnspython")
- import dns.reversename
-
- rev = dns.reversename.from_address(address)
- result = ns_query(rev)
-
- if not result:
- try:
- result = socket.gethostbyaddr(address)[0]
- except:
- return None
- else:
- result = address
-
- if fqdn:
- # strip trailing .
- if result.endswith('.'):
- return result[:-1]
- else:
- return result
- else:
- return result.split('.')[0]
diff --git a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ovs/__init__.py b/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ovs/__init__.py
deleted file mode 100644
index 77e2db7..0000000
--- a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ovs/__init__.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright 2014-2015 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-''' Helpers for interacting with OpenvSwitch '''
-import subprocess
-import os
-from charmhelpers.core.hookenv import (
- log, WARNING
-)
-from charmhelpers.core.host import (
- service
-)
-
-
-def add_bridge(name):
- ''' Add the named bridge to openvswitch '''
- log('Creating bridge {}'.format(name))
- subprocess.check_call(["ovs-vsctl", "--", "--may-exist", "add-br", name])
-
-
-def del_bridge(name):
- ''' Delete the named bridge from openvswitch '''
- log('Deleting bridge {}'.format(name))
- subprocess.check_call(["ovs-vsctl", "--", "--if-exists", "del-br", name])
-
-
-def add_bridge_port(name, port, promisc=False):
- ''' Add a port to the named openvswitch bridge '''
- log('Adding port {} to bridge {}'.format(port, name))
- subprocess.check_call(["ovs-vsctl", "--", "--may-exist", "add-port",
- name, port])
- subprocess.check_call(["ip", "link", "set", port, "up"])
- if promisc:
- subprocess.check_call(["ip", "link", "set", port, "promisc", "on"])
- else:
- subprocess.check_call(["ip", "link", "set", port, "promisc", "off"])
-
-
-def del_bridge_port(name, port):
- ''' Delete a port from the named openvswitch bridge '''
- log('Deleting port {} from bridge {}'.format(port, name))
- subprocess.check_call(["ovs-vsctl", "--", "--if-exists", "del-port",
- name, port])
- subprocess.check_call(["ip", "link", "set", port, "down"])
- subprocess.check_call(["ip", "link", "set", port, "promisc", "off"])
-
-
-def set_manager(manager):
- ''' Set the controller for the local openvswitch '''
- log('Setting manager for local ovs to {}'.format(manager))
- subprocess.check_call(['ovs-vsctl', 'set-manager',
- 'ssl:{}'.format(manager)])
-
-
-CERT_PATH = '/etc/openvswitch/ovsclient-cert.pem'
-
-
-def get_certificate():
- ''' Read openvswitch certificate from disk '''
- if os.path.exists(CERT_PATH):
- log('Reading ovs certificate from {}'.format(CERT_PATH))
- with open(CERT_PATH, 'r') as cert:
- full_cert = cert.read()
- begin_marker = "-----BEGIN CERTIFICATE-----"
- end_marker = "-----END CERTIFICATE-----"
- begin_index = full_cert.find(begin_marker)
- end_index = full_cert.rfind(end_marker)
- if end_index == -1 or begin_index == -1:
- raise RuntimeError("Certificate does not contain valid begin"
- " and end markers.")
- full_cert = full_cert[begin_index:(end_index + len(end_marker))]
- return full_cert
- else:
- log('Certificate not found', level=WARNING)
- return None
-
-
-def full_restart():
- ''' Full restart and reload of openvswitch '''
- if os.path.exists('/etc/init/openvswitch-force-reload-kmod.conf'):
- service('start', 'openvswitch-force-reload-kmod')
- else:
- service('force-reload-kmod', 'openvswitch-switch')
diff --git a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ufw.py b/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ufw.py
deleted file mode 100644
index b65d963..0000000
--- a/charms/trusty/contrail-control/hooks/charmhelpers/contrib/network/ufw.py
+++ /dev/null
@@ -1,318 +0,0 @@
-# Copyright 2014-2015 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-This module contains helpers to add and remove ufw rules.
-
-Examples:
-
-- open SSH port for subnet 10.0.3.0/24:
-
- >>> from charmhelpers.contrib.network import ufw
- >>> ufw.enable()
- >>> ufw.grant_access(src='10.0.3.0/24', dst='any', port='22', proto='tcp')
-
-- open service by name as defined in /etc/services:
-
- >>> from charmhelpers.contrib.network import ufw
- >>> ufw.enable()
- >>> ufw.service('ssh', 'open')
-
-- close service by port number:
-
- >>> from charmhelpers.contrib.network import ufw
- >>> ufw.enable()
- >>> ufw.service('4949', 'close') # munin
-"""
-import re
-import os
-import subprocess
-
-from charmhelpers.core import hookenv
-from charmhelpers.core.kernel import modprobe, is_module_loaded
-
-__author__ = "Felipe Reyes <felipe.reyes@canonical.com>"
-
-
-class UFWError(Exception):
- pass
-
-
-class UFWIPv6Error(UFWError):
- pass
-
-
-def is_enabled():
- """
- Check if `ufw` is enabled
-
- :returns: True if ufw is enabled
- """
- output = subprocess.check_output(['ufw', 'status'],
- universal_newlines=True,
- env={'LANG': 'en_US',
- 'PATH': os.environ['PATH']})
-
- m = re.findall(r'^Status: active\n', output, re.M)
-
- return len(m) >= 1
-
-
-def is_ipv6_ok(soft_fail=False):
- """
- Check if IPv6 support is present and ip6tables functional
-
- :param soft_fail: If set to True and IPv6 support is broken, then reports
- that the host doesn't have IPv6 support, otherwise a
- UFWIPv6Error exception is raised.
- :returns: True if IPv6 is working, False otherwise
- """
-
- # do we have IPv6 in the machine?
- if os.path.isdir('/proc/sys/net/ipv6'):
- # is ip6tables kernel module loaded?
- if not is_module_loaded('ip6_tables'):
- # ip6tables support isn't complete, let's try to load it
- try:
- modprobe('ip6_tables')
- # great, we can load the module
- return True
- except subprocess.CalledProcessError as ex:
- hookenv.log("Couldn't load ip6_tables module: %s" % ex.output,
- level="WARN")
- # we are in a world where ip6tables isn't working
- if soft_fail:
- # so we inform that the machine doesn't have IPv6
- return False
- else:
- raise UFWIPv6Error("IPv6 firewall support broken")
- else:
- # the module is present :)
- return True
-
- else:
- # the system doesn't have IPv6
- return False
-
-
-def disable_ipv6():
- """
- Disable ufw IPv6 support in /etc/default/ufw
- """
- exit_code = subprocess.call(['sed', '-i', 's/IPV6=.*/IPV6=no/g',
- '/etc/default/ufw'])
- if exit_code == 0:
- hookenv.log('IPv6 support in ufw disabled', level='INFO')
- else:
- hookenv.log("Couldn't disable IPv6 support in ufw", level="ERROR")
- raise UFWError("Couldn't disable IPv6 support in ufw")
-
-
-def enable(soft_fail=False):
- """
- Enable ufw
-
- :param soft_fail: If set to True silently disables IPv6 support in ufw,
- otherwise a UFWIPv6Error exception is raised when IP6
- support is broken.
- :returns: True if ufw is successfully enabled
- """
- if is_enabled():
- return True
-
- if not is_ipv6_ok(soft_fail):
- disable_ipv6()
-
- output = subprocess.check_output(['ufw', 'enable'],
- universal_newlines=True,
- env={'LANG': 'en_US',
- 'PATH': os.environ['PATH']})
-
- m = re.findall('^Firewall is active and enabled on system startup\n',
- output, re.M)
- hookenv.log(output, level='DEBUG')
-
- if len(m) == 0:
- hookenv.log("ufw couldn't be enabled", level='WARN')
- return False
- else:
- hookenv.log("ufw enabled", level='INFO')
- return True
-
-
-def disable():
- """
- Disable ufw
-
- :returns: True if ufw is successfully disabled
- """
- if not is_enabled():
- return True
-
- output = subprocess.check_output(['ufw', 'disable'],
- universal_newlines=True,
- env={'LANG': 'en_US',
- 'PATH': os.environ['PATH']})
-
- m = re.findall(r'^Firewall stopped and disabled on system startup\n',
- output, re.M)
- hookenv.log(output, level='DEBUG')
-
- if len(m) == 0:
- hookenv.log("ufw couldn't be disabled", level='WARN')
- return False
- else:
- hookenv.log("ufw disabled", level='INFO')
- return True
-
-
-def default_policy(policy='deny', direction='incoming'):
- """
- Changes the default policy for traffic `direction`
-
- :param policy: allow, deny or reject
- :param direction: traffic direction, possible values: incoming, outgoing,
- routed
- """
- if policy not in ['allow', 'deny', 'reject']:
- raise UFWError(('Unknown policy %s, valid values: '
- 'allow, deny, reject') % policy)
-
- if direction not in ['incoming', 'outgoing', 'routed']:
- raise UFWError(('Unknown direction %s, valid values: '
- 'incoming, outgoing, routed') % direction)
-
- output = subprocess.check_output(['ufw', 'default', policy, direction],
- universal_newlines=True,
- env={'LANG': 'en_US',
- 'PATH': os.environ['PATH']})
- hookenv.log(output, level='DEBUG')
-
- m = re.findall("^Default %s policy changed to '%s'\n" % (direction,
- policy),
- output, re.M)
- if len(m) == 0:
- hookenv.log("ufw couldn't change the default policy to %s for %s"
- % (policy, direction), level='WARN')
- return False
- else:
- hookenv.log("ufw default policy for %s changed to %s"
- % (direction, policy), level='INFO')
- return True
-
-
-def modify_access(src, dst='any', port=None, proto=None, action='allow',
- index=None):
- """
- Grant access to an address or subnet
-
- :param src: address (e.g. 192.168.1.234) or subnet
- (e.g. 192.168.1.0/24).
- :param dst: destiny of the connection, if the machine has multiple IPs and
- connections to only one of those have to accepted this is the
- field has to be set.
- :param port: destiny port
- :param proto: protocol (tcp or udp)
- :param action: `allow` or `delete`
- :param index: if different from None the rule is inserted at the given
- `index`.
- """
- if not is_enabled():
- hookenv.log('ufw is disabled, skipping modify_access()', level='WARN')
- return
-
- if action == 'delete':
- cmd = ['ufw', 'delete', 'allow']
- elif index is not None:
- cmd = ['ufw', 'insert', str(index), action]
- else:
- cmd = ['ufw', action]
-
- if src is not None:
- cmd += ['from', src]
-
- if dst is not None:
- cmd += ['to', dst]
-
- if port is not None:
- cmd += ['port', str(port)]
-
- if proto is not None:
- cmd += ['proto', proto]
-
- hookenv.log('ufw {}: {}'.format(action, ' '.join(cmd)), level='DEBUG')
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
- (stdout, stderr) = p.communicate()
-
- hookenv.log(stdout, level='INFO')
-
- if p.returncode != 0:
- hookenv.log(stderr, level='ERROR')
- hookenv.log('Error running: {}, exit code: {}'.format(' '.join(cmd),
- p.returncode),
- level='ERROR')
-
-
-def grant_access(src, dst='any', port=None, proto=None, index=None):
- """
- Grant access to an address or subnet
-
- :param src: address (e.g. 192.168.1.234) or subnet
- (e.g. 192.168.1.0/24).
- :param dst: destiny of the connection, if the machine has multiple IPs and
- connections to only one of those have to accepted this is the
- field has to be set.
- :param port: destiny port
- :param proto: protocol (tcp or udp)
- :param index: if different from None the rule is inserted at the given
- `index`.
- """
- return modify_access(src, dst=dst, port=port, proto=proto, action='allow',
- index=index)
-
-
-def revoke_access(src, dst='any', port=None, proto=None):
- """
- Revoke access to an address or subnet
-
- :param src: address (e.g. 192.168.1.234) or subnet
- (e.g. 192.168.1.0/24).
- :param dst: destiny of the connection, if the machine has multiple IPs and
- connections to only one of those have to accepted this is the
- field has to be set.
- :param port: destiny port
- :param proto: protocol (tcp or udp)
- """
- return modify_access(src, dst=dst, port=port, proto=proto, action='delete')
-
-
-def service(name, action):
- """
- Open/close access to a service
-
- :param name: could be a service name defined in `/etc/services` or a port
- number.
- :param action: `open` or `close`
- """
- if action == 'open':
- subprocess.check_output(['ufw', 'allow', str(name)],
- universal_newlines=True)
- elif action == 'close':
- subprocess.check_output(['ufw', 'delete', 'allow', str(name)],
- universal_newlines=True)
- else:
- raise UFWError(("'{}' not supported, use 'allow' "
- "or 'delete'").format(action))