aboutsummaryrefslogtreecommitdiffstats
path: root/functest/utils
diff options
context:
space:
mode:
Diffstat (limited to 'functest/utils')
-rw-r--r--functest/utils/config.py16
-rw-r--r--functest/utils/env.py17
-rw-r--r--functest/utils/functest_utils.py189
3 files changed, 192 insertions, 30 deletions
diff --git a/functest/utils/config.py b/functest/utils/config.py
index 61d8401c5..40414b88b 100644
--- a/functest/utils/config.py
+++ b/functest/utils/config.py
@@ -10,15 +10,16 @@ import six
from functest.utils import env
-class Config(object):
+class Config():
def __init__(self):
try:
- # pylint: disable=bad-continuation
with open(pkg_resources.resource_filename(
- 'functest', 'ci/config_functest.yaml')) as yfile:
+ 'functest', 'ci/config_functest.yaml'),
+ encoding='utf-8') as yfile:
self.functest_yaml = yaml.safe_load(yfile)
except Exception as error:
- raise Exception('Parse config failed: {}'.format(str(error)))
+ raise Exception(
+ f'Parse config failed: {str(error)}') from error
@staticmethod
def _merge_dicts(dict1, dict2):
@@ -34,7 +35,7 @@ class Config(object):
yield (k, dict2[k])
def patch_file(self, patch_file_path):
- with open(patch_file_path) as yfile:
+ with open(patch_file_path, encoding='utf-8') as yfile:
patch_file = yaml.safe_load(yfile)
for key in patch_file:
@@ -53,13 +54,14 @@ class Config(object):
@staticmethod
def _get_attr_further(attr_now, next): # pylint: disable=redefined-builtin
return attr_now if next == 'general' else (
- '{}_{}'.format(attr_now, next) if attr_now else next)
+ f'{attr_now}_{next}' if attr_now else next)
def fill(self):
try:
self._parse(None, self.functest_yaml)
except Exception as error:
- raise Exception('Parse config failed: {}'.format(str(error)))
+ raise Exception(
+ f'Parse config failed: {str(error)}') from error
CONF = Config()
diff --git a/functest/utils/env.py b/functest/utils/env.py
index cba935ab8..2e312726c 100644
--- a/functest/utils/env.py
+++ b/functest/utils/env.py
@@ -17,6 +17,7 @@ from xtesting.utils import env
INPUTS = {
'EXTERNAL_NETWORK': None,
'CI_LOOP': env.INPUTS['CI_LOOP'],
+ 'DEBUG': env.INPUTS['DEBUG'],
'DEPLOY_SCENARIO': env.INPUTS['DEPLOY_SCENARIO'],
'INSTALLER_TYPE': env.INPUTS['INSTALLER_TYPE'],
'SDN_CONTROLLER_IP': None,
@@ -28,11 +29,19 @@ INPUTS = {
'NODE_NAME': env.INPUTS['NODE_NAME'],
'POD_ARCH': None,
'TEST_DB_URL': env.INPUTS['TEST_DB_URL'],
- 'ENERGY_RECORDER_API_URL': env.INPUTS['ENERGY_RECORDER_API_URL'],
- 'ENERGY_RECORDER_API_USER': env.INPUTS['ENERGY_RECORDER_API_USER'],
- 'ENERGY_RECORDER_API_PASSWORD': env.INPUTS['ENERGY_RECORDER_API_PASSWORD'],
'VOLUME_DEVICE_NAME': 'vdb',
- 'NAMESERVER': '8.8.8.8'
+ 'IMAGE_PROPERTIES': '',
+ 'FLAVOR_EXTRA_SPECS': '',
+ 'NAMESERVER': '8.8.8.8',
+ 'NEW_USER_ROLE': 'Member',
+ 'USE_DYNAMIC_CREDENTIALS': 'True',
+ 'BLOCK_MIGRATION': 'False',
+ 'CLEAN_ORPHAN_SECURITY_GROUPS': 'True',
+ 'SKIP_DOWN_HYPERVISORS': 'False',
+ 'PUBLIC_ENDPOINT_ONLY': 'False',
+ 'DASHBOARD_URL': '',
+ 'VMTP_HYPERVISORS': '',
+ 'NO_TENANT_NETWORK': 'False'
}
diff --git a/functest/utils/functest_utils.py b/functest/utils/functest_utils.py
index b614af321..eec544489 100644
--- a/functest/utils/functest_utils.py
+++ b/functest/utils/functest_utils.py
@@ -11,10 +11,14 @@
from __future__ import print_function
import logging
+import os
import subprocess
import sys
import yaml
+from shade import _utils
+import six
+
LOGGER = logging.getLogger(__name__)
@@ -28,28 +32,26 @@ def execute_command_raise(cmd, info=False, error_msg="",
def execute_command(cmd, info=False, error_msg="",
verbose=True, output_file=None):
if not error_msg:
- error_msg = ("The command '%s' failed." % cmd)
- msg_exec = ("Executing command: '%s'" % cmd)
+ error_msg = f"The command '{cmd}' failed."
+ msg_exec = f"Executing command: '{cmd}'"
if verbose:
if info:
LOGGER.info(msg_exec)
else:
LOGGER.debug(msg_exec)
- popen = subprocess.Popen(
- cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- if output_file:
- ofd = open(output_file, "w")
- for line in iter(popen.stdout.readline, b''):
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as popen:
if output_file:
- ofd.write(line)
- else:
- line = line.replace('\n', '')
- print (line)
- sys.stdout.flush()
- if output_file:
- ofd.close()
- popen.stdout.close()
- returncode = popen.wait()
+ with open(output_file, "w", encoding='utf-8') as ofd:
+ for line in iter(popen.stdout.readline, b''):
+ if output_file:
+ ofd.write(line.decode("utf-8"))
+ else:
+ line = line.decode("utf-8").replace('\n', '')
+ print(line)
+ sys.stdout.flush()
+ returncode = popen.wait()
if returncode != 0:
if verbose:
LOGGER.error(error_msg)
@@ -63,12 +65,161 @@ def get_parameter_from_yaml(parameter, yfile):
parameter must be given in string format with dots
Example: general.openstack.image_name
"""
- with open(yfile) as yfd:
+ with open(yfile, encoding='utf-8') as yfd:
file_yaml = yaml.safe_load(yfd)
value = file_yaml
for element in parameter.split("."):
value = value.get(element)
if value is None:
- raise ValueError("The parameter %s is not defined in"
- " %s" % (parameter, yfile))
+ raise ValueError(f"The parameter {parameter} is not defined in"
+ f" {yfile}")
return value
+
+
+def get_nova_version(cloud):
+ """ Get Nova API microversion
+
+ Returns:
+
+ - Nova API microversion
+ - None on operation error
+ """
+ # pylint: disable=protected-access
+ try:
+ request = cloud._compute_client.request("/", "GET")
+ LOGGER.debug('cloud._compute_client.request: %s', request)
+ version = request["version"]["version"]
+ major, minor = version.split('.')
+ LOGGER.debug('nova version: %s', (int(major), int(minor)))
+ return (int(major), int(minor))
+ except Exception: # pylint: disable=broad-except
+ LOGGER.exception("Cannot detect Nova version")
+ return None
+
+
+def get_openstack_version(cloud):
+ """ Detect OpenStack version via Nova API microversion
+
+ It follows `MicroversionHistory
+ <https://docs.openstack.org/nova/latest/reference/api-microversion-history.html>`_.
+
+ Returns:
+
+ - OpenStack release
+ - Unknown on operation error
+ """
+ # pylint: disable=too-many-branches
+ version = get_nova_version(cloud)
+ try:
+ assert version
+ if version > (2, 93):
+ osversion = "Master"
+ elif version > (2, 90):
+ osversion = "Zed"
+ elif version > (2, 88):
+ osversion = "Xena"
+ elif version > (2, 87):
+ osversion = "Wallaby"
+ elif version > (2, 79):
+ osversion = "Ussuri"
+ elif version > (2, 72):
+ osversion = "Train"
+ elif version > (2, 65):
+ osversion = "Stein"
+ elif version > (2, 60):
+ osversion = "Rocky"
+ elif version > (2, 53):
+ osversion = "Queens"
+ elif version > (2, 42):
+ osversion = "Pike"
+ elif version > (2, 38):
+ osversion = "Ocata"
+ elif version > (2, 25):
+ osversion = "Newton"
+ elif version > (2, 12):
+ osversion = "Mitaka"
+ elif version > (2, 3):
+ osversion = "Liberty"
+ elif version >= (2, 1):
+ osversion = "Kilo"
+ else:
+ osversion = "Unknown"
+ LOGGER.info('Detect OpenStack version: %s', osversion)
+ return osversion
+ except AssertionError:
+ LOGGER.exception("Cannot detect OpenStack version")
+ return "Unknown"
+
+
+def list_services(cloud):
+ # pylint: disable=protected-access
+ """Search Keystone services via $OS_INTERFACE.
+
+ It mainly conforms with `Shade
+ <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+ public endpoints. It's worth mentioning that it doesn't support keystone
+ v2.
+
+ :returns: a list of ``munch.Munch`` containing the services description
+
+ :raises: ``OpenStackCloudException`` if something goes wrong during the
+ openstack API call.
+ """
+ url, key = '/services', 'services'
+ data = cloud._identity_client.get(
+ url, endpoint_filter={
+ 'interface': os.environ.get('OS_INTERFACE', 'public')},
+ error_message="Failed to list services")
+ services = cloud._get_and_munchify(key, data)
+ return _utils.normalize_keystone_services(services)
+
+
+def search_services(cloud, name_or_id=None, filters=None):
+ # pylint: disable=protected-access
+ """Search Keystone services ia $OS_INTERFACE.
+
+ It mainly conforms with `Shade
+ <https://docs.openstack.org/shade/latest>`_ but allows testing vs
+ public endpoints. It's worth mentioning that it doesn't support keystone
+ v2.
+
+ :param name_or_id: Name or id of the desired service.
+ :param filters: a dict containing additional filters to use. e.g.
+ {'type': 'network'}.
+
+ :returns: a list of ``munch.Munch`` containing the services description
+
+ :raises: ``OpenStackCloudException`` if something goes wrong during the
+ openstack API call.
+ """
+ services = list_services(cloud)
+ return _utils._filter_list(services, name_or_id, filters)
+
+
+def convert_dict_to_ini(value):
+ "Convert dict to oslo.conf input"
+ assert isinstance(value, dict)
+ return ",".join(f"{key}:{val}" for (key, val) in six.iteritems(value))
+
+
+def convert_list_to_ini(value):
+ "Convert list to oslo.conf input"
+ assert isinstance(value, list)
+ return ",".join(val for val in value)
+
+
+def convert_ini_to_dict(value):
+ "Convert oslo.conf input to dict"
+ assert isinstance(value, str)
+ try:
+ return dict((x.rsplit(':', 1) for x in value.split(',')))
+ except ValueError:
+ return {}
+
+
+def convert_ini_to_list(value):
+ "Convert list to oslo.conf input"
+ assert isinstance(value, str)
+ if not value:
+ return []
+ return list(value.split(','))