summaryrefslogtreecommitdiffstats
path: root/lib/python/apex/network_settings.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python/apex/network_settings.py')
-rw-r--r--lib/python/apex/network_settings.py316
1 files changed, 163 insertions, 153 deletions
diff --git a/lib/python/apex/network_settings.py b/lib/python/apex/network_settings.py
index 8e39afd6..11798085 100644
--- a/lib/python/apex/network_settings.py
+++ b/lib/python/apex/network_settings.py
@@ -10,20 +10,21 @@
import yaml
import logging
import ipaddress
+
+from copy import copy
+
from . import ip_utils
-from .common.utils import str2bool
+from .common import utils
from .common.constants import (
+ CONTROLLER,
+ COMPUTE,
+ ROLES,
+ DOMAIN_NAME,
+ DNS_SERVERS,
ADMIN_NETWORK,
- PRIVATE_NETWORK,
- PUBLIC_NETWORK,
- STORAGE_NETWORK,
- API_NETWORK,
+ EXTERNAL_NETWORK,
OPNFV_NETWORK_TYPES,
- DNS_SERVERS,
- DOMAIN_NAME,
- ROLES,
- COMPUTE,
- CONTROLLER)
+)
class NetworkSettings(dict):
@@ -46,7 +47,6 @@ class NetworkSettings(dict):
else:
# assume input is a dict to build from
init_dict = filename
-
super().__init__(init_dict)
if 'apex' in self:
@@ -69,46 +69,51 @@ class NetworkSettings(dict):
self.nics_specified = {COMPUTE: False, CONTROLLER: False}
self._validate_input()
+ def get_network(self, network):
+ if network == EXTERNAL_NETWORK and self['networks'][network]:
+ return self['networks'][network][0]
+ else:
+ return self['networks'][network]
+
def _validate_input(self):
"""
Validates the network settings file and populates all fields.
NetworkSettingsException will be raised if validation fails.
"""
- if ADMIN_NETWORK not in self or \
- not str2bool(self[ADMIN_NETWORK].get(
- 'enabled')):
- raise NetworkSettingsException("You must enable admin_network "
- "and configure it explicitly or "
- "use auto-detection")
- if self.network_isolation and \
- (PUBLIC_NETWORK not in self or not
- str2bool(self[PUBLIC_NETWORK].get(
- 'enabled'))):
- raise NetworkSettingsException("You must enable public_network "
+ if not self['networks'].get(ADMIN_NETWORK, {}).get('enabled', False):
+ raise NetworkSettingsException("You must enable admin network "
"and configure it explicitly or "
"use auto-detection")
for network in OPNFV_NETWORK_TYPES:
- if network in self:
- if str2bool(self[network].get('enabled')):
+ if network in self['networks']:
+ _network = self.get_network(network)
+ if _network.get('enabled', True):
logging.info("{} enabled".format(network))
self._config_required_settings(network)
+ if network == EXTERNAL_NETWORK:
+ nicmap = _network['nic_mapping']
+ else:
+ nicmap = _network['nic_mapping']
+ iface = nicmap[CONTROLLER]['members'][0]
self._config_ip_range(network=network,
- setting='usable_ip_range',
+ interface=iface,
+ ip_range='usable_ip_range',
start_offset=21, end_offset=21)
- self._config_optional_settings(network)
self.enabled_network_list.append(network)
self._validate_overcloud_nic_order(network)
+ # TODO self._config_optional_settings(network)
else:
logging.info("{} disabled, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
else:
logging.info("{} is not in specified, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
+ if 'dns-domain' not in self:
+ self['domain_name'] = DOMAIN_NAME
self['dns_servers'] = self.get('dns_servers', DNS_SERVERS)
- self['domain_name'] = self.get('domain_name', DOMAIN_NAME)
def _validate_overcloud_nic_order(self, network):
"""
@@ -116,42 +121,35 @@ class NetworkSettings(dict):
for network
If nic order is specified in a network for a profile, it should be
- specified for every network with that profile other than admin_network
+ specified for every network with that profile other than admin network
Duplicate nic names are also not allowed across different networks
:param network: network to detect if nic order present
:return: None
"""
-
for role in ROLES:
- interface = role+'_interface'
- nic_index = self.get_enabled_networks().index(network) + 1
- if interface in self[network]:
- if any(y == self[network][interface] for x, y in
- self.nics[role].items()):
- raise NetworkSettingsException("Duplicate {} already "
- "specified for "
- "another network"
- .format(self[network]
- [interface]))
- self.nics[role][network] = self[network][interface]
+ _network = self.get_network(network)
+ _nicmap = _network.get('nic_mapping', {})
+ _role = _nicmap.get(role, {})
+ interfaces = _role.get('members', [])
+
+ if interfaces:
+ interface = interfaces[0]
+ if type(_role.get('vlan', 'native')) is not int and \
+ any(y == interface for x, y in self.nics[role].items()):
+ raise NetworkSettingsException(
+ "Duplicate {} already specified for "
+ "another network".format(interface))
+ self.nics[role][network] = interface
self.nics_specified[role] = True
logging.info("{} nic order specified for network {"
"}".format(role, network))
- elif self.nics_specified[role]:
- logging.error("{} nic order not specified for network {"
- "}".format(role, network))
- raise NetworkSettingsException("Must specify {} for all "
- "enabled networks (other than "
- " admin) or not specify it for "
- "any".format(interface))
else:
- logging.info("{} nic order not specified for network {"
- "}. Will use logical default "
- "nic{}".format(interface, network, nic_index))
- self.nics[role][network] = 'nic' + str(nic_index)
- nic_index += 1
+ raise NetworkSettingsException(
+ "Interface members are not supplied for {} network "
+ "for the {} role. Please add nic assignments"
+ "".format(network, role))
def _config_required_settings(self, network):
"""
@@ -164,85 +162,93 @@ class NetworkSettings(dict):
given NIC in the system. The resulting config in settings object will
be an ipaddress.network object, replacing the NIC name.
"""
+ _network = self.get_network(network)
# if vlan not defined then default it to native
if network is not ADMIN_NETWORK:
- if 'vlan' not in self[network]:
- self[network]['vlan'] = 'native'
+ for role in ROLES:
+ if 'vlan' not in _network['nic_mapping'][role]:
+ _network['nic_mapping'][role]['vlan'] = 'native'
- cidr = self[network].get('cidr')
- nic_name = self[network].get('bridged_interface')
+ cidr = _network.get('cidr')
if cidr:
- cidr = ipaddress.ip_network(self[network]['cidr'])
- self[network]['cidr'] = cidr
+ cidr = ipaddress.ip_network(_network['cidr'])
+ _network['cidr'] = cidr
logging.info("{}_cidr: {}".format(network, cidr))
- return 0
- elif nic_name:
+ elif 'installer_vm' in _network:
+ ucloud_if_list = _network['installer_vm']['members']
# If cidr is not specified, we need to know if we should find
# IPv6 or IPv4 address on the interface
- if str2bool(self[network].get('ipv6')):
- address_family = 6
- else:
- address_family = 4
- nic_interface = ip_utils.get_interface(nic_name, address_family)
- if nic_interface:
- self[network]['bridged_interface'] = nic_interface
+ ip = ipaddress.ip_address(_network['installer_vm']['ip'])
+ nic_if = ip_utils.get_interface(ucloud_if_list[0], ip.version)
+ if nic_if:
+ ucloud_if_list = [nic_if]
logging.info("{}_bridged_interface: {}".
- format(network, nic_interface))
- return 0
+ format(network, nic_if))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "Unable to find valid ip for "
- "interface {}"
- .format(network, nic_name))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: Unable to find valid "
+ "ip for interface {}".format(network, ucloud_if_list[0]))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "either bridge_interface or cidr "
- "must be specified"
- .format(network))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: either installer_vm "
+ "members or cidr must be specified".format(network))
- def _config_ip_range(self, network, setting, start_offset=None,
- end_offset=None, count=None):
+ # undercloud settings
+ if network == ADMIN_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='dhcp_range',
+ start_offset=2, count=9)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='introspection_range',
+ start_offset=11, count=9)
+ elif network == EXTERNAL_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='floating_ip_range',
+ end_offset=2, count=20)
+
+ gateway = _network['gateway']
+ interface = _network['installer_vm']['ip']
+ self._config_gateway(network, gateway, interface)
+
+ def _config_ip_range(self, network, ip_range, interface=None,
+ start_offset=None, end_offset=None, count=None):
"""
Configures IP range for a given setting.
-
If the setting is already specified, no change will be made.
-
The spec for start_offset, end_offset and count are identical to
ip_utils.get_ip_range.
"""
- ip_range = self[network].get(setting)
- interface = self[network].get('bridged_interface')
-
- if not ip_range:
- cidr = self[network].get('cidr')
- ip_range = ip_utils.get_ip_range(start_offset=start_offset,
- end_offset=end_offset,
- count=count,
- cidr=cidr,
- interface=interface)
- self[network][setting] = ip_range
-
- logging.info("{}_{}: {}".format(network, setting, ip_range))
-
- def _config_ip(self, network, setting, offset):
+ _network = self.get_network(network)
+ if ip_range not in _network:
+ cidr = _network.get('cidr')
+ _ip_range = ip_utils.get_ip_range(start_offset=start_offset,
+ end_offset=end_offset,
+ count=count,
+ cidr=cidr,
+ interface=interface)
+ _network[ip_range] = _ip_range.split(',')
+
+ logging.info("Config IP Range: {} {}".format(network, ip_range))
+
+ def _gen_ip(self, network, offset):
"""
- Configures IP for a given setting.
-
- If the setting is already specified, no change will be made.
-
- The spec for offset is identical to ip_utils.get_ip
+ Generate and ip offset within the given network
"""
- ip = self[network].get(setting)
- interface = self[network].get('bridged_interface')
-
- if not ip:
- cidr = self[network].get('cidr')
- ip = ip_utils.get_ip(offset, cidr, interface)
- self[network][setting] = ip
-
- logging.info("{}_{}: {}".format(network, setting, ip))
+ _network = self.get_network(network)
+ cidr = _network.get('cidr')
+ ip = ip_utils.get_ip(offset, cidr)
+ logging.info("Config IP: {} {}".format(network, ip))
+ return ip
def _config_optional_settings(self, network):
"""
@@ -257,42 +263,41 @@ class NetworkSettings(dict):
- gateway
"""
if network == ADMIN_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
- self._config_ip_range(network=network, setting='dhcp_range',
+ self._config_ip(network, None, 'provisioner_ip', 1)
+ self._config_ip_range(network=network,
+ ip_range='dhcp_range',
start_offset=2, count=9)
self._config_ip_range(network=network,
- setting='introspection_range',
+ ip_range='introspection_range',
start_offset=11, count=9)
- elif network == PUBLIC_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
+ elif network == EXTERNAL_NETWORK:
+ self._config_ip(network, None, 'provisioner_ip', 1)
self._config_ip_range(network=network,
- setting='floating_ip_range',
+ ip_range='floating_ip_range',
end_offset=2, count=20)
self._config_gateway(network)
- def _config_gateway(self, network):
+ def _config_gateway(self, network, gateway, interface):
"""
Configures gateway setting for a given network.
If cidr is specified, we always use the first address in the address
space for gateway. Otherwise, we detect the system gateway.
"""
- gateway = self[network].get('gateway')
- interface = self[network].get('bridged_interface')
-
+ _network = self.get_network(network)
if not gateway:
- cidr = self[network].get('cidr')
+ cidr = _network.get('cidr')
if cidr:
- gateway = ip_utils.get_ip(1, cidr)
+ _gateway = ip_utils.get_ip(1, cidr)
else:
- gateway = ip_utils.find_gateway(interface)
+ _gateway = ip_utils.find_gateway(interface)
- if gateway:
- self[network]['gateway'] = gateway
+ if _gateway:
+ _network['gateway'] = _gateway
else:
raise NetworkSettingsException("Failed to set gateway")
- logging.info("{}_gateway: {}".format(network, gateway))
+ logging.info("Config Gateway: {} {}".format(network, gateway))
def dump_bash(self, path=None):
"""
@@ -301,45 +306,50 @@ class NetworkSettings(dict):
If optional path is provided, bash string will be written to the file
instead of stdout.
"""
+ def flatten(name, obj, delim=','):
+ """
+ flatten lists to delim separated strings
+ flatten dics to underscored key names and string values
+ """
+ if type(obj) is list:
+ return "{}=\'{}\'\n".format(name,
+ delim.join(map(lambda x: str(x),
+ obj)))
+ elif type(obj) is dict:
+ flat_str = ''
+ for k in obj:
+ flat_str += flatten("{}_{}".format(name, k), obj[k])
+ return flat_str
+ elif type(obj) is str:
+ return "{}='{}'\n".format(name, obj)
+ else:
+ return "{}={}\n".format(name, str(obj))
+
bash_str = ''
for network in self.enabled_network_list:
- for key, value in self[network].items():
- bash_str += "{}_{}={}\n".format(network, key, value)
- bash_str += "enabled_network_list='{}'\n" \
- .format(' '.join(self.enabled_network_list))
- bash_str += "ip_addr_family={}\n".format(self.get_ip_addr_family())
- dns_list = ""
- for dns_server in self['dns_servers']:
- dns_list = dns_list + "{} ".format(dns_server)
- dns_list = dns_list.strip()
- bash_str += "dns_servers=\'{}\'\n".format(dns_list)
- bash_str += "domain_name=\'{}\'\n".format(self['domain_name'])
+ _network = self.get_network(network)
+ bash_str += flatten(network, _network)
+ bash_str += flatten('enabled_network_list',
+ self.enabled_network_list, ' ')
+ bash_str += flatten('ip_addr_family', self.get_ip_addr_family())
+ bash_str += flatten('dns_servers', self['dns_servers'], ' ')
+ bash_str += flatten('domain_name', self['dns-domain'], ' ')
if path:
with open(path, 'w') as file:
file.write(bash_str)
else:
print(bash_str)
- def get_ip_addr_family(self):
+ def get_ip_addr_family(self,):
"""
Returns IP address family for current deployment.
If any enabled network has IPv6 CIDR, the deployment is classified as
IPv6.
"""
- for network in self.enabled_network_list:
- cidr = ipaddress.ip_network(self[network]['cidr'])
- if cidr.version == 6:
- return 6
-
- return 4
-
- def get_enabled_networks(self):
- """
- Getter for enabled network list
- :return: list of enabled networks
- """
- return self.enabled_network_list
+ return max([
+ ipaddress.ip_network(self.get_network(n)['cidr']).version
+ for n in self.enabled_network_list])
class NetworkSettingsException(Exception):