summaryrefslogtreecommitdiffstats
path: root/lib/python/apex/network_settings.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python/apex/network_settings.py')
-rw-r--r--lib/python/apex/network_settings.py360
1 files changed, 0 insertions, 360 deletions
diff --git a/lib/python/apex/network_settings.py b/lib/python/apex/network_settings.py
deleted file mode 100644
index 79b0a9d1..00000000
--- a/lib/python/apex/network_settings.py
+++ /dev/null
@@ -1,360 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 Feng Pan (fpan@redhat.com) and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import yaml
-import logging
-import ipaddress
-
-from copy import copy
-from .common import utils
-from . import ip_utils
-from .common.constants import (
- CONTROLLER,
- COMPUTE,
- ROLES,
- DOMAIN_NAME,
- DNS_SERVERS,
- NTP_SERVER,
- ADMIN_NETWORK,
- EXTERNAL_NETWORK,
- OPNFV_NETWORK_TYPES,
-)
-
-
-class NetworkSettings(dict):
- """
- This class parses APEX network settings yaml file into an object. It
- generates or detects all missing fields for deployment.
-
- The resulting object will be used later to generate network environment
- file as well as configuring post deployment networks.
-
- Currently the parsed object is dumped into a bash global definition file
- for deploy.sh consumption. This object will later be used directly as
- deployment script move to python.
- """
- def __init__(self, filename):
- init_dict = {}
- if isinstance(filename, str):
- with open(filename, 'r') as network_settings_file:
- init_dict = yaml.safe_load(network_settings_file)
- else:
- # assume input is a dict to build from
- init_dict = filename
- super().__init__(init_dict)
-
- if 'apex' in self:
- # merge two dics Nondestructively
- def merge(pri, sec):
- for key, val in sec.items():
- if key in pri:
- if isinstance(val, dict):
- merge(pri[key], val)
- # else
- # do not overwrite what's already there
- else:
- pri[key] = val
- # merge the apex specific config into the first class settings
- merge(self, copy(self['apex']))
-
- self.enabled_network_list = []
- self.nics = {COMPUTE: {}, CONTROLLER: {}}
- self.nics_specified = {COMPUTE: False, CONTROLLER: False}
- self._validate_input()
-
- def get_network(self, network):
- if network == EXTERNAL_NETWORK and self['networks'][network]:
- for net in self['networks'][network]:
- if 'public' in net:
- return net
-
- raise NetworkSettingsException("The external network, "
- "'public', should be defined "
- "when external networks are "
- "enabled")
- else:
- return self['networks'][network]
-
- def _validate_input(self):
- """
- Validates the network settings file and populates all fields.
-
- NetworkSettingsException will be raised if validation fails.
- """
- if not self['networks'].get(ADMIN_NETWORK, {}).get('enabled', False):
- raise NetworkSettingsException("You must enable admin network "
- "and configure it explicitly or "
- "use auto-detection")
-
- for network in OPNFV_NETWORK_TYPES:
- if network in self['networks']:
- _network = self.get_network(network)
- if _network.get('enabled', True):
- logging.info("{} enabled".format(network))
- self._config_required_settings(network)
- nicmap = _network['nic_mapping']
- self._validate_overcloud_nic_order(network)
- iface = nicmap[CONTROLLER]['members'][0]
- self._config_ip_range(network=network,
- interface=iface,
- ip_range='overcloud_ip_range',
- start_offset=21, end_offset=21)
- self.enabled_network_list.append(network)
- # TODO self._config_optional_settings(network)
- else:
- logging.info("{} disabled, will collapse with "
- "admin network".format(network))
- else:
- logging.info("{} is not in specified, will collapse with "
- "admin network".format(network))
-
- if 'dns-domain' not in self:
- self['domain_name'] = DOMAIN_NAME
- self['dns_servers'] = self.get('dns_nameservers', DNS_SERVERS)
- self['ntp_servers'] = self.get('ntp', NTP_SERVER)
-
- def _validate_overcloud_nic_order(self, network):
- """
- Detects if nic order is specified per profile (compute/controller)
- for network
-
- If nic order is specified in a network for a profile, it should be
- specified for every network with that profile other than admin network
-
- Duplicate nic names are also not allowed across different networks
-
- :param network: network to detect if nic order present
- :return: None
- """
- for role in ROLES:
- _network = self.get_network(network)
- _nicmap = _network.get('nic_mapping', {})
- _role = _nicmap.get(role, {})
- interfaces = _role.get('members', [])
-
- if interfaces:
- interface = interfaces[0]
- if not isinstance(_role.get('vlan', 'native'), int) and \
- any(y == interface for x, y in self.nics[role].items()):
- raise NetworkSettingsException(
- "Duplicate {} already specified for "
- "another network".format(interface))
- self.nics[role][network] = interface
- self.nics_specified[role] = True
- logging.info("{} nic order specified for network {"
- "}".format(role, network))
- else:
- raise NetworkSettingsException(
- "Interface members are not supplied for {} network "
- "for the {} role. Please add nic assignments"
- "".format(network, role))
-
- def _config_required_settings(self, network):
- """
- Configures either CIDR or bridged_interface setting
-
- cidr takes precedence if both cidr and bridged_interface are specified
- for a given network.
-
- When using bridged_interface, we will detect network setting on the
- given NIC in the system. The resulting config in settings object will
- be an ipaddress.network object, replacing the NIC name.
- """
- _network = self.get_network(network)
- # if vlan not defined then default it to native
- if network is not ADMIN_NETWORK:
- for role in ROLES:
- if 'vlan' not in _network['nic_mapping'][role]:
- _network['nic_mapping'][role]['vlan'] = 'native'
-
- cidr = _network.get('cidr')
-
- if cidr:
- cidr = ipaddress.ip_network(_network['cidr'])
- _network['cidr'] = cidr
- logging.info("{}_cidr: {}".format(network, cidr))
- elif 'installer_vm' in _network:
- ucloud_if_list = _network['installer_vm']['members']
- # If cidr is not specified, we need to know if we should find
- # IPv6 or IPv4 address on the interface
- ip = ipaddress.ip_address(_network['installer_vm']['ip'])
- nic_if = ip_utils.get_interface(ucloud_if_list[0], ip.version)
- if nic_if:
- logging.info("{}_bridged_interface: {}".
- format(network, nic_if))
- else:
- raise NetworkSettingsException(
- "Auto detection failed for {}: Unable to find valid "
- "ip for interface {}".format(network, ucloud_if_list[0]))
-
- else:
- raise NetworkSettingsException(
- "Auto detection failed for {}: either installer_vm "
- "members or cidr must be specified".format(network))
-
- # undercloud settings
- if network == ADMIN_NETWORK:
- provisioner_ip = _network['installer_vm']['ip']
- iface = _network['installer_vm']['members'][0]
- if not provisioner_ip:
- _network['installer_vm']['ip'] = self._gen_ip(network, 1)
- self._config_ip_range(network=network, interface=iface,
- ip_range='dhcp_range',
- start_offset=2, count=9)
- self._config_ip_range(network=network, interface=iface,
- ip_range='introspection_range',
- start_offset=11, count=9)
- elif network == EXTERNAL_NETWORK:
- provisioner_ip = _network['installer_vm']['ip']
- iface = _network['installer_vm']['members'][0]
- if not provisioner_ip:
- _network['installer_vm']['ip'] = self._gen_ip(network, 1)
- self._config_ip_range(network=network, interface=iface,
- ip_range='floating_ip_range',
- end_offset=2, count=20)
-
- gateway = _network['gateway']
- interface = _network['installer_vm']['ip']
- self._config_gateway(network, gateway, interface)
-
- def _config_ip_range(self, network, ip_range, interface=None,
- start_offset=None, end_offset=None, count=None):
- """
- Configures IP range for a given setting.
- If the setting is already specified, no change will be made.
- The spec for start_offset, end_offset and count are identical to
- ip_utils.get_ip_range.
- """
- _network = self.get_network(network)
- if ip_range not in _network:
- cidr = _network.get('cidr')
- _ip_range = ip_utils.get_ip_range(start_offset=start_offset,
- end_offset=end_offset,
- count=count,
- cidr=cidr,
- interface=interface)
- _network[ip_range] = _ip_range.split(',')
-
- logging.info("Config IP Range: {} {}".format(network, ip_range))
-
- def _gen_ip(self, network, offset):
- """
- Generate and ip offset within the given network
- """
- _network = self.get_network(network)
- cidr = _network.get('cidr')
- ip = ip_utils.get_ip(offset, cidr)
- logging.info("Config IP: {} {}".format(network, ip))
- return ip
-
- def _config_optional_settings(self, network):
- """
- Configures optional settings:
- - admin_network:
- - provisioner_ip
- - dhcp_range
- - introspection_range
- - public_network:
- - provisioner_ip
- - floating_ip_range
- - gateway
- """
- if network == ADMIN_NETWORK:
- self._config_ip(network, None, 'provisioner_ip', 1)
- self._config_ip_range(network=network,
- ip_range='dhcp_range',
- start_offset=2, count=9)
- self._config_ip_range(network=network,
- ip_range='introspection_range',
- start_offset=11, count=9)
- elif network == EXTERNAL_NETWORK:
- self._config_ip(network, None, 'provisioner_ip', 1)
- self._config_ip_range(network=network,
- ip_range='floating_ip_range',
- end_offset=2, count=20)
- self._config_gateway(network)
-
- def _config_gateway(self, network, gateway, interface):
- """
- Configures gateway setting for a given network.
-
- If cidr is specified, we always use the first address in the address
- space for gateway. Otherwise, we detect the system gateway.
- """
- _network = self.get_network(network)
- if not gateway:
- cidr = _network.get('cidr')
- if cidr:
- _gateway = ip_utils.get_ip(1, cidr)
- else:
- _gateway = ip_utils.find_gateway(interface)
-
- if _gateway:
- _network['gateway'] = _gateway
- else:
- raise NetworkSettingsException("Failed to set gateway")
-
- logging.info("Config Gateway: {} {}".format(network, gateway))
-
- def dump_bash(self, path=None):
- """
- Prints settings for bash consumption.
-
- If optional path is provided, bash string will be written to the file
- instead of stdout.
- """
- def flatten(name, obj, delim=','):
- """
- flatten lists to delim separated strings
- flatten dics to underscored key names and string values
- """
- if isinstance(obj, list):
- return "{}=\'{}\'\n".format(name,
- delim.join(map(lambda x: str(x),
- obj)))
- elif isinstance(obj, dict):
- flat_str = ''
- for k in obj:
- flat_str += flatten("{}_{}".format(name, k), obj[k])
- return flat_str
- elif isinstance(obj, str):
- return "{}='{}'\n".format(name, obj)
- else:
- return "{}={}\n".format(name, str(obj))
-
- bash_str = ''
- for network in self.enabled_network_list:
- _network = self.get_network(network)
- bash_str += flatten(network, _network)
- bash_str += flatten('enabled_network_list',
- self.enabled_network_list, ' ')
- bash_str += flatten('ip_addr_family', self.get_ip_addr_family())
- bash_str += flatten('dns_servers', self['dns_servers'], ' ')
- bash_str += flatten('domain_name', self['dns-domain'], ' ')
- bash_str += flatten('ntp_server', self['ntp_servers'][0], ' ')
- utils.write_str(bash_str, path)
-
- def get_ip_addr_family(self,):
- """
- Returns IP address family for current deployment.
-
- If any enabled network has IPv6 CIDR, the deployment is classified as
- IPv6.
- """
- return max([
- ipaddress.ip_network(self.get_network(n)['cidr']).version
- for n in self.enabled_network_list])
-
-
-class NetworkSettingsException(Exception):
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return self.value