aboutsummaryrefslogtreecommitdiffstats
path: root/functest
diff options
context:
space:
mode:
Diffstat (limited to 'functest')
-rw-r--r--functest/ci/config_aarch64_patch.yaml8
-rw-r--r--functest/ci/config_functest.yaml38
-rw-r--r--functest/ci/download_images.sh31
-rwxr-xr-xfunctest/ci/prepare_env.py11
-rw-r--r--functest/ci/testcases.yaml1
-rw-r--r--functest/cli/commands/cli_env.py20
-rw-r--r--functest/cli/commands/cli_os.py15
-rw-r--r--functest/cli/commands/cli_testcase.py13
-rw-r--r--functest/cli/commands/cli_tier.py13
-rw-r--r--functest/core/pytest_suite_runner.py43
-rw-r--r--functest/core/testcase.py1
-rw-r--r--functest/opnfv_tests/openstack/rally/rally.py53
-rwxr-xr-xfunctest/opnfv_tests/openstack/refstack_client/refstack_client.py42
-rwxr-xr-xfunctest/opnfv_tests/openstack/refstack_client/tempest_conf.py8
-rw-r--r--functest/opnfv_tests/openstack/snaps/health_check.py7
-rw-r--r--functest/opnfv_tests/openstack/snaps/smoke.py10
-rw-r--r--functest/opnfv_tests/openstack/snaps/snaps_test_runner.py4
-rw-r--r--functest/opnfv_tests/openstack/tempest/conf_utils.py138
-rw-r--r--functest/opnfv_tests/openstack/tempest/tempest.py31
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_base.py411
-rwxr-xr-xfunctest/opnfv_tests/openstack/vping/vping_ssh.py278
-rwxr-xr-xfunctest/opnfv_tests/openstack/vping/vping_userdata.py134
-rwxr-xr-xfunctest/opnfv_tests/sdn/odl/odl.py28
-rw-r--r--functest/opnfv_tests/sdn/onos/onos.py2
-rw-r--r--functest/opnfv_tests/vnf/ims/opera_ims.py8
-rw-r--r--functest/tests/unit/ci/test_prepare_env.py11
-rw-r--r--functest/tests/unit/ci/test_run_tests.py3
-rw-r--r--functest/tests/unit/ci/test_tier_builder.py3
-rw-r--r--functest/tests/unit/ci/test_tier_handler.py3
-rw-r--r--functest/tests/unit/cli/commands/test_cli_env.py24
-rw-r--r--functest/tests/unit/cli/commands/test_cli_os.py35
-rw-r--r--functest/tests/unit/cli/commands/test_cli_testcase.py19
-rw-r--r--functest/tests/unit/cli/commands/test_cli_tier.py23
-rw-r--r--functest/tests/unit/cli/test_cli_base.py3
-rw-r--r--functest/tests/unit/core/test_feature.py7
-rw-r--r--functest/tests/unit/core/test_pytest_suite_runner.py50
-rw-r--r--functest/tests/unit/core/test_testcase.py5
-rw-r--r--functest/tests/unit/core/test_vnf.py145
-rw-r--r--functest/tests/unit/energy/test_functest_energy.py3
-rw-r--r--functest/tests/unit/features/test_barometer.py3
-rw-r--r--functest/tests/unit/odl/test_odl.py61
-rw-r--r--functest/tests/unit/openstack/rally/test_rally.py30
-rw-r--r--functest/tests/unit/openstack/refstack_client/test_refstack_client.py26
-rw-r--r--functest/tests/unit/openstack/tempest/test_conf_utils.py39
-rw-r--r--functest/tests/unit/openstack/tempest/test_tempest.py29
-rw-r--r--functest/tests/unit/utils/test_decorators.py133
-rw-r--r--functest/tests/unit/utils/test_functest_utils.py53
-rw-r--r--functest/tests/unit/utils/test_openstack_clean.py3
-rw-r--r--functest/tests/unit/utils/test_openstack_snapshot.py3
-rw-r--r--functest/tests/unit/utils/test_openstack_tacker.py3
-rw-r--r--functest/tests/unit/utils/test_openstack_utils.py3
-rw-r--r--functest/tests/unit/vnf/ims/test_clearwater.py3
-rw-r--r--functest/tests/unit/vnf/ims/test_cloudify_ims.py3
-rw-r--r--functest/tests/unit/vnf/ims/test_ims_base.py3
-rw-r--r--functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py3
-rw-r--r--[-rwxr-xr-x]functest/utils/config.py14
-rw-r--r--[-rwxr-xr-x]functest/utils/constants.py19
-rw-r--r--functest/utils/decorators.py8
-rw-r--r--functest/utils/env.py9
-rw-r--r--functest/utils/functest_utils.py22
-rw-r--r--functest/utils/openstack_utils.py114
61 files changed, 1319 insertions, 947 deletions
diff --git a/functest/ci/config_aarch64_patch.yaml b/functest/ci/config_aarch64_patch.yaml
index b43b5a76..cd395ab8 100644
--- a/functest/ci/config_aarch64_patch.yaml
+++ b/functest/ci/config_aarch64_patch.yaml
@@ -16,5 +16,13 @@ os:
vping:
image_name: TestVM
+ odl_sfc:
+ image_base_url: "http://artifacts.opnfv.org/sfc/demo"
+ image_name: sfc_nsh_danube
+ image_file_name: sf_nsh_danube_arm64.img
+ image_initrd: sf_nsh_danube_arm64-initrd
+ image_kernel: sf_nsh_danube_arm64-kernel
+ image_format: ami
+ os_cmd_line: 'root=LABEL=cloudimg-rootfs ro'
doctor:
image_name: TestVM
diff --git a/functest/ci/config_functest.yaml b/functest/ci/config_functest.yaml
index 677c4856..9b796071 100644
--- a/functest/ci/config_functest.yaml
+++ b/functest/ci/config_functest.yaml
@@ -38,6 +38,7 @@ general:
functest_conf: /home/opnfv/functest/conf
functest_data: /home/opnfv/functest/data
ims_data: /home/opnfv/functest/data/ims/
+ functest_images: /home/opnfv/functest/images
rally_inst: /home/opnfv/.rally
repo_kingbird: /home/opnfv/repos/kingbird
refstack_client: /home/opnfv/repos/refstack-client
@@ -49,6 +50,8 @@ general:
image_name: Cirros-0.3.5
image_name_alt: Cirros-0.3.5-1
image_file_name: cirros-0.3.5-x86_64-disk.img
+ image_url: http://download.cirros-cloud.net/0.3.5/cirros-0.3.5-x86_64-disk.img
+ image_user: cirros
image_disk_format: qcow2
image_username: cirros
image_password: cubswin:)
@@ -71,14 +74,23 @@ general:
functest:
testcases_yaml: /home/opnfv/repos/functest/functest/ci/testcases.yaml
-healthcheck:
- disk_image: /home/opnfv/functest/data/cirros-0.3.5-x86_64-disk.img
- disk_format: qcow2
- wait_time: 60
-
snaps:
use_keystone: True
- use_floating_ips: False
+ use_floating_ips: True
+# images:
+# cirros:
+# disk_url: http://download.cirros-cloud.net/0.3.5/cirros-0.3.5-x86_64-disk.img
+ # ARM
+# disk_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-disk.img
+# kernel_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-kernel
+# ramdisk_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-initramfs
+# extra_properties:
+# os_command_line: root=/dev/vdb1 rw rootwait console=tty0 console=ttyS0 console=ttyAMA0
+# hw_video_model: vga
+# ubuntu:
+# disk_url: http://uec-images.ubuntu.com/releases/trusty/14.04/ubuntu-14.04-server-cloudimg-amd64-disk1.img
+# centos:
+# disk_url: http://cloud.centos.org/centos/7/images/CentOS-7-x86_64-GenericCloud.qcow2
vping:
ping_timeout: 200
@@ -92,12 +104,26 @@ vping:
router_name: vping-router
sg_name: vPing-sg
sg_desc: Security group for vPing test case
+ keypair_name: vPing-keypair
+ keypair_priv_file: /tmp/vPing-keypair
+ keypair_pub_file: /tmp/vPing-keypair.pub
+ vm_boot_timeout: 180
+ vm_delete_timeout: 100
+ vm_ssh_connect_timeout: 60
+ cleanup_objects: True
+ unique_names: True
onos_sfc:
image_base_url: http://artifacts.opnfv.org/sfc/demo
image_name: TestSfcVm
image_file_name: firewall_block_image.img
+odl_sfc:
+ image_base_url: "http://artifacts.opnfv.org/sfc/images"
+ image_name: sfc_nsh_danube
+ image_file_name: sfc_nsh_danube.qcow2
+ image_format: qcow2
+
tempest:
deployment_name: opnfv-tempest
identity:
diff --git a/functest/ci/download_images.sh b/functest/ci/download_images.sh
new file mode 100644
index 00000000..f3fdef2e
--- /dev/null
+++ b/functest/ci/download_images.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+CIRROS_REPO_URL=http://download.cirros-cloud.net
+CIRROS_AARCH64_TAG=161201
+CIRROS_X86_64_TAG=0.3.5
+
+RED='\033[1;31m'
+NC='\033[0m' # No Color
+
+function usage(){
+ echo -e "${RED}USAGE: $script <destination_folder>${NC}"
+ exit 0
+}
+
+script=`basename "$0"`
+IMAGES_FOLDER_DIR=$1
+
+if [[ -z $IMAGES_FOLDER_DIR ]]; then usage; fi;
+
+set -ex
+mkdir -p ${IMAGES_FOLDER_DIR}
+
+wget -nc ${CIRROS_REPO_URL}/${CIRROS_X86_64_TAG}/cirros-${CIRROS_X86_64_TAG}-x86_64-disk.img -P ${IMAGES_FOLDER_DIR}
+wget -nc ${CIRROS_REPO_URL}/${CIRROS_X86_64_TAG}/cirros-${CIRROS_X86_64_TAG}-x86_64-lxc.tar.gz -P ${IMAGES_FOLDER_DIR}
+wget -nc http://artifacts.opnfv.org/sdnvpn/ubuntu-16.04-server-cloudimg-amd64-disk1.img -P ${IMAGES_FOLDER_DIR}
+
+# Add 3rd-party images for aarch64, since Functest can be run on an x86 machine to test an aarch64 POD
+wget -nc ${CIRROS_REPO_URL}/daily/20${CIRROS_AARCH64_TAG}/cirros-d${CIRROS_AARCH64_TAG}-aarch64-disk.img -P ${IMAGES_FOLDER_DIR}
+wget -nc ${CIRROS_REPO_URL}/daily/20${CIRROS_AARCH64_TAG}/cirros-d${CIRROS_AARCH64_TAG}-aarch64-initramfs -P ${IMAGES_FOLDER_DIR}
+wget -nc ${CIRROS_REPO_URL}/daily/20${CIRROS_AARCH64_TAG}/cirros-d${CIRROS_AARCH64_TAG}-aarch64-kernel -P ${IMAGES_FOLDER_DIR}
+set +ex \ No newline at end of file
diff --git a/functest/ci/prepare_env.py b/functest/ci/prepare_env.py
index 9fd07958..8e17a4fc 100755
--- a/functest/ci/prepare_env.py
+++ b/functest/ci/prepare_env.py
@@ -156,8 +156,8 @@ def create_directories():
logger.info(" %s created." %
CONST.__getattribute__('dir_functest_conf'))
else:
- logger.debug(" %s already exists."
- % CONST.__getattribute__('dir_functest_conf'))
+ logger.debug(" %s already exists." %
+ CONST.__getattribute__('dir_functest_conf'))
if not os.path.exists(CONST.__getattribute__('dir_functest_data')):
os.makedirs(CONST.__getattribute__('dir_functest_data'))
@@ -166,6 +166,13 @@ def create_directories():
else:
logger.debug(" %s already exists." %
CONST.__getattribute__('dir_functest_data'))
+ if not os.path.exists(CONST.__getattribute__('dir_functest_images')):
+ os.makedirs(CONST.__getattribute__('dir_functest_images'))
+ logger.info(" %s created." %
+ CONST.__getattribute__('dir_functest_images'))
+ else:
+ logger.debug(" %s already exists." %
+ CONST.__getattribute__('dir_functest_images'))
def source_rc_file():
diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml
index d98a2de2..10587f26 100644
--- a/functest/ci/testcases.yaml
+++ b/functest/ci/testcases.yaml
@@ -302,7 +302,6 @@ tiers:
-
case_name: bgpvpn
- enabled: false
project_name: sdnvpn
criteria: 100
blocking: false
diff --git a/functest/cli/commands/cli_env.py b/functest/cli/commands/cli_env.py
index 14ad01bf..f5ba4b34 100644
--- a/functest/cli/commands/cli_env.py
+++ b/functest/cli/commands/cli_env.py
@@ -28,7 +28,7 @@ class CliEnv(object):
"it again? [y|n]\n")
while True:
if answer.lower() in ["y", "yes"]:
- os.remove(CONST.env_active)
+ os.remove(CONST.__getattribute__('env_active'))
break
elif answer.lower() in ["n", "no"]:
return
@@ -36,19 +36,19 @@ class CliEnv(object):
answer = raw_input("Invalid answer. Please type [y|n]\n")
cmd = ("python %s/functest/ci/prepare_env.py start" %
- CONST.dir_repo_functest)
+ CONST.__getattribute__('dir_repo_functest'))
ft_utils.execute_command(cmd)
def show(self):
def _get_value(attr, default='Unknown'):
return attr if attr else default
- install_type = _get_value(CONST.INSTALLER_TYPE)
- installer_ip = _get_value(CONST.INSTALLER_IP)
+ install_type = _get_value(CONST.__getattribute__('INSTALLER_TYPE'))
+ installer_ip = _get_value(CONST.__getattribute__('INSTALLER_IP'))
installer_info = ("%s, %s" % (install_type, installer_ip))
- scenario = _get_value(CONST.DEPLOY_SCENARIO)
- node = _get_value(CONST.NODE_NAME)
- repo_h = git.Repo(CONST.dir_repo_functest).head
+ scenario = _get_value(CONST.__getattribute__('DEPLOY_SCENARIO'))
+ node = _get_value(CONST.__getattribute__('NODE_NAME'))
+ repo_h = git.Repo(CONST.__getattribute__('dir_repo_functest')).head
if repo_h.is_detached:
git_branch = 'detached from FETCH_HEAD'
git_hash = repo_h.commit.hexsha
@@ -56,8 +56,8 @@ class CliEnv(object):
branch = repo_h.reference
git_branch = branch.name
git_hash = branch.commit.hexsha
- is_debug = _get_value(CONST.CI_DEBUG, 'false')
- build_tag = CONST.BUILD_TAG
+ is_debug = _get_value(CONST.__getattribute__('CI_DEBUG'), 'false')
+ build_tag = CONST.__getattribute__('BUILD_TAG')
if build_tag is not None:
build_tag = build_tag.lstrip(
"jenkins-").lstrip("functest").lstrip("-")
@@ -84,7 +84,7 @@ class CliEnv(object):
def status(self, verbose=True):
ret_val = 0
- if not os.path.isfile(CONST.env_active):
+ if not os.path.isfile(CONST.__getattribute__('env_active')):
if verbose:
click.echo("Functest environment is not installed.\n")
ret_val = 1
diff --git a/functest/cli/commands/cli_os.py b/functest/cli/commands/cli_os.py
index f85f4041..e54eb423 100644
--- a/functest/cli/commands/cli_os.py
+++ b/functest/cli/commands/cli_os.py
@@ -21,11 +21,11 @@ import functest.utils.openstack_snapshot as os_snapshot
class CliOpenStack(object):
def __init__(self):
- self.os_auth_url = CONST.OS_AUTH_URL
+ self.os_auth_url = CONST.__getattribute__('OS_AUTH_URL')
self.endpoint_ip = None
self.endpoint_port = None
- self.openstack_creds = CONST.openstack_creds
- self.snapshot_file = CONST.openstack_snapshot_file
+ self.openstack_creds = CONST.__getattribute__('openstack_creds')
+ self.snapshot_file = CONST.__getattribute__('openstack_snapshot_file')
if self.os_auth_url:
self.endpoint_ip = self.os_auth_url.rsplit("/")[2].rsplit(":")[0]
self.endpoint_port = self.os_auth_url.rsplit("/")[2].rsplit(":")[1]
@@ -59,16 +59,16 @@ class CliOpenStack(object):
else:
answer = raw_input("Invalid answer. Please type [y|n]\n")
- installer_type = CONST.INSTALLER_TYPE
+ installer_type = CONST.__getattribute__('INSTALLER_TYPE')
if installer_type is None:
click.echo("The environment variable 'INSTALLER_TYPE' is not"
"defined. Please export it")
- installer_ip = CONST.INSTALLER_IP
+ installer_ip = CONST.__getattribute__('INSTALLER_IP')
if installer_ip is None:
click.echo("The environment variable 'INSTALLER_IP' is not"
"defined. Please export it")
cmd = ("%s/releng/utils/fetch_os_creds.sh -d %s -i %s -a %s"
- % (CONST.dir_repos,
+ % (CONST.__getattribute__('dir_repos'),
self.openstack_creds,
installer_type,
installer_ip))
@@ -78,7 +78,8 @@ class CliOpenStack(object):
def check(self):
self.ping_endpoint()
- cmd = CONST.dir_repo_functest + "/functest/ci/check_os.sh"
+ cmd = os.path.join(CONST.__getattribute__('dir_repo_functest'),
+ "functest/ci/check_os.sh")
ft_utils.execute_command(cmd, verbose=False)
def snapshot_create(self):
diff --git a/functest/cli/commands/cli_testcase.py b/functest/cli/commands/cli_testcase.py
index 6644a0c2..3d3f1cb3 100644
--- a/functest/cli/commands/cli_testcase.py
+++ b/functest/cli/commands/cli_testcase.py
@@ -22,9 +22,10 @@ import functest.utils.functest_vacation as vacation
class CliTestcase(object):
def __init__(self):
- self.tiers = tb.TierBuilder(CONST.INSTALLER_TYPE,
- CONST.DEPLOY_SCENARIO,
- CONST.functest_testcases_yaml)
+ self.tiers = tb.TierBuilder(
+ CONST.__getattribute__('INSTALLER_TYPE'),
+ CONST.__getattribute__('DEPLOY_SCENARIO'),
+ CONST.__getattribute__('functest_testcases_yaml'))
def list(self):
summary = ""
@@ -52,12 +53,14 @@ class CliTestcase(object):
if testname == 'vacation':
vacation.main()
- elif not os.path.isfile(CONST.env_active):
+ elif not os.path.isfile(CONST.__getattribute__('env_active')):
click.echo("Functest environment is not ready. "
"Run first 'functest env prepare'")
else:
tests = testname.split(",")
for test in tests:
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, flags, test))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ flags, test))
ft_utils.execute_command(cmd)
diff --git a/functest/cli/commands/cli_tier.py b/functest/cli/commands/cli_tier.py
index 012b11d0..531f0ff9 100644
--- a/functest/cli/commands/cli_tier.py
+++ b/functest/cli/commands/cli_tier.py
@@ -21,9 +21,10 @@ import functest.utils.functest_utils as ft_utils
class CliTier(object):
def __init__(self):
- self.tiers = tb.TierBuilder(CONST.INSTALLER_TYPE,
- CONST.DEPLOY_SCENARIO,
- CONST.functest_testcases_yaml)
+ self.tiers = tb.TierBuilder(
+ CONST.__getattribute__('INSTALLER_TYPE'),
+ CONST.__getattribute__('DEPLOY_SCENARIO'),
+ CONST.__getattribute__('functest_testcases_yaml'))
def list(self):
summary = ""
@@ -62,10 +63,12 @@ class CliTier(object):
if report:
flags += "-r "
- if not os.path.isfile(CONST.env_active):
+ if not os.path.isfile(CONST.__getattribute__('env_active')):
click.echo("Functest environment is not ready. "
"Run first 'functest env prepare'")
else:
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, flags, tiername))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ flags, tiername))
ft_utils.execute_command(cmd)
diff --git a/functest/core/pytest_suite_runner.py b/functest/core/pytest_suite_runner.py
index 21edc187..a6e47660 100644
--- a/functest/core/pytest_suite_runner.py
+++ b/functest/core/pytest_suite_runner.py
@@ -5,27 +5,46 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import testcase as base
-import unittest
+# pylint: disable=missing-docstring
+
+import logging
import time
+import unittest
+import six
-class PyTestSuiteRunner(base.TestCase):
+from functest.core import testcase
+
+
+class PyTestSuiteRunner(testcase.TestCase):
"""
This superclass is designed to execute pre-configured unittest.TestSuite()
objects
"""
+
def __init__(self, **kwargs):
super(PyTestSuiteRunner, self).__init__(**kwargs)
self.suite = None
- self.logger = None
+ self.logger = logging.getLogger(__name__)
def run(self, **kwargs):
"""
Starts test execution from the functest framework
"""
+ try:
+ name = kwargs["name"]
+ try:
+ self.suite = unittest.TestLoader().loadTestsFromName(name)
+ except ImportError:
+ self.logger.error("Can not import %s", name)
+ return testcase.TestCase.EX_RUN_ERROR
+ except KeyError:
+ pass
self.start_time = time.time()
- result = unittest.TextTestRunner(verbosity=2).run(self.suite)
+ stream = six.StringIO()
+ result = unittest.TextTestRunner(
+ stream=stream, verbosity=2).run(self.suite)
+ self.logger.debug("\n\n%s", stream.getvalue())
self.stop_time = time.time()
if result.errors:
@@ -45,14 +64,14 @@ class PyTestSuiteRunner(base.TestCase):
# we shall distinguish Execution Error from FAIL results
# TestCase.EX_RUN_ERROR means that the test case was not run
# not that it was run but the result was FAIL
- exit_code = base.TestCase.EX_OK
- if ((result.errors and len(result.errors) > 0)
- or (result.failures and len(result.failures) > 0)):
- self.logger.info("%s FAILED" % self.case_name)
- self.result = 'FAIL'
+ exit_code = testcase.TestCase.EX_OK
+ if ((result.errors and len(result.errors) > 0) or
+ (result.failures and len(result.failures) > 0)):
+ self.logger.info("%s FAILED", self.case_name)
+ self.result = 0
else:
- self.logger.info("%s OK" % self.case_name)
- self.result = 'PASS'
+ self.logger.info("%s OK", self.case_name)
+ self.result = 100
self.details = {}
return exit_code
diff --git a/functest/core/testcase.py b/functest/core/testcase.py
index d8b63ef2..43161525 100644
--- a/functest/core/testcase.py
+++ b/functest/core/testcase.py
@@ -94,6 +94,7 @@ class TestCase(object):
"""
try:
assert self.criteria
+ assert self.result is not None
if (not isinstance(self.result, str) and
not isinstance(self.criteria, str)):
if self.result >= self.criteria:
diff --git a/functest/opnfv_tests/openstack/rally/rally.py b/functest/opnfv_tests/openstack/rally/rally.py
index f762383a..86ec3558 100644
--- a/functest/opnfv_tests/openstack/rally/rally.py
+++ b/functest/opnfv_tests/openstack/rally/rally.py
@@ -8,6 +8,8 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
+from __future__ import division
+
import json
import logging
import os
@@ -20,7 +22,6 @@ import yaml
from functest.core import testcase
from functest.utils.constants import CONST
-import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
logger = logging.getLogger(__name__)
@@ -29,14 +30,17 @@ logger = logging.getLogger(__name__)
class RallyBase(testcase.TestCase):
TESTS = ['authenticate', 'glance', 'cinder', 'heat', 'keystone',
'neutron', 'nova', 'quotas', 'requests', 'vm', 'all']
- GLANCE_IMAGE_NAME = CONST.openstack_image_name
- GLANCE_IMAGE_FILENAME = CONST.openstack_image_file_name
- GLANCE_IMAGE_PATH = os.path.join(CONST.dir_functest_data,
- GLANCE_IMAGE_FILENAME)
- GLANCE_IMAGE_FORMAT = CONST.openstack_image_disk_format
+ GLANCE_IMAGE_NAME = CONST.__getattribute__('openstack_image_name')
+ GLANCE_IMAGE_FILENAME = CONST.__getattribute__('openstack_image_file_name')
+ GLANCE_IMAGE_PATH = os.path.join(
+ CONST.__getattribute__('dir_functest_images'),
+ GLANCE_IMAGE_FILENAME)
+ GLANCE_IMAGE_FORMAT = CONST.__getattribute__('openstack_image_disk_format')
FLAVOR_NAME = "m1.tiny"
- RALLY_DIR = os.path.join(CONST.dir_repo_functest, CONST.dir_rally)
+ RALLY_DIR = os.path.join(
+ CONST.__getattribute__('dir_repo_functest'),
+ CONST.__getattribute__('dir_rally'))
RALLY_SCENARIO_DIR = os.path.join(RALLY_DIR, "scenario")
TEMPLATE_DIR = os.path.join(RALLY_SCENARIO_DIR, "templates")
SUPPORT_DIR = os.path.join(RALLY_SCENARIO_DIR, "support")
@@ -44,17 +48,17 @@ class RallyBase(testcase.TestCase):
TENANTS_AMOUNT = 3
ITERATIONS_AMOUNT = 10
CONCURRENCY = 4
- RESULTS_DIR = os.path.join(CONST.dir_results, 'rally')
- TEMPEST_CONF_FILE = os.path.join(CONST.dir_results,
+ RESULTS_DIR = os.path.join(CONST.__getattribute__('dir_results'), 'rally')
+ TEMPEST_CONF_FILE = os.path.join(CONST.__getattribute__('dir_results'),
'tempest/tempest.conf')
BLACKLIST_FILE = os.path.join(RALLY_DIR, "blacklist.txt")
TEMP_DIR = os.path.join(RALLY_DIR, "var")
CINDER_VOLUME_TYPE_NAME = "volume_test"
- RALLY_PRIVATE_NET_NAME = CONST.rally_network_name
- RALLY_PRIVATE_SUBNET_NAME = CONST.rally_subnet_name
- RALLY_PRIVATE_SUBNET_CIDR = CONST.rally_subnet_cidr
- RALLY_ROUTER_NAME = CONST.rally_router_name
+ RALLY_PRIVATE_NET_NAME = CONST.__getattribute__('rally_network_name')
+ RALLY_PRIVATE_SUBNET_NAME = CONST.__getattribute__('rally_subnet_name')
+ RALLY_PRIVATE_SUBNET_CIDR = CONST.__getattribute__('rally_subnet_cidr')
+ RALLY_ROUTER_NAME = CONST.__getattribute__('rally_router_name')
def __init__(self, **kwargs):
super(RallyBase, self).__init__(**kwargs)
@@ -96,7 +100,7 @@ class RallyBase(testcase.TestCase):
task_args['netid'] = ''
# get keystone auth endpoint
- task_args['request_url'] = CONST.OS_AUTH_URL or ''
+ task_args['request_url'] = CONST.__getattribute__('OS_AUTH_URL') or ''
return task_args
@@ -182,8 +186,8 @@ class RallyBase(testcase.TestCase):
with open(RallyBase.BLACKLIST_FILE, 'r') as black_list_file:
black_list_yaml = yaml.safe_load(black_list_file)
- installer_type = CONST.INSTALLER_TYPE
- deploy_scenario = CONST.DEPLOY_SCENARIO
+ installer_type = CONST.__getattribute__('INSTALLER_TYPE')
+ deploy_scenario = CONST.__getattribute__('DEPLOY_SCENARIO')
if (bool(installer_type) * bool(deploy_scenario)):
if 'scenario' in black_list_yaml.keys():
for item in black_list_yaml['scenario']:
@@ -480,11 +484,12 @@ class RallyBase(testcase.TestCase):
total_duration_str2 = "{0:<10}".format(total_duration_str)
total_nb_tests_str = "{0:<13}".format(total_nb_tests)
- if len(self.summary):
- success_rate = total_success / len(self.summary)
- else:
- success_rate = 100
- success_rate = "{:0.2f}".format(success_rate)
+ try:
+ self.result = total_success / len(self.summary)
+ except ZeroDivisionError:
+ self.result = 100
+
+ success_rate = "{:0.2f}".format(self.result)
success_rate_str = "{0:<10}".format(str(success_rate) + '%')
report += ("+===================+============"
"+===============+===========+")
@@ -500,12 +505,10 @@ class RallyBase(testcase.TestCase):
'nb tests': total_nb_tests,
'nb success': success_rate}})
- self.result = ft_utils.check_success_rate(
- self.case_name, success_rate)
self.details = payload
- logger.info("Rally '%s' success_rate is %s%%, is marked as %s"
- % (self.case_name, success_rate, self.result))
+ logger.info("Rally '%s' success_rate is %s%%"
+ % (self.case_name, success_rate))
def _clean_up(self):
if self.volume_type:
diff --git a/functest/opnfv_tests/openstack/refstack_client/refstack_client.py b/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
index ebae4b86..5f1f3a1d 100755
--- a/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
+++ b/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
@@ -5,6 +5,10 @@
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+
+from __future__ import division
+
+
import argparse
import logging
import os
@@ -29,9 +33,9 @@ class RefstackClient(testcase.TestCase):
if "case_name" not in kwargs:
kwargs["case_name"] = "refstack_defcore"
super(RefstackClient, self).__init__(**kwargs)
- self.FUNCTEST_TEST = CONST.dir_functest_test
- self.CONF_PATH = CONST.refstack_tempest_conf_path
- self.DEFCORE_LIST = CONST.refstack_defcore_list
+ self.FUNCTEST_TEST = CONST.__getattribute__('dir_functest_test')
+ self.CONF_PATH = CONST.__getattribute__('refstack_tempest_conf_path')
+ self.DEFCORE_LIST = CONST.__getattribute__('refstack_defcore_list')
self.confpath = os.path.join(self.FUNCTEST_TEST,
self.CONF_PATH)
self.defcorelist = os.path.join(self.FUNCTEST_TEST,
@@ -41,7 +45,7 @@ class RefstackClient(testcase.TestCase):
cmd = ("cd {0};"
". .venv/bin/activate;"
- "cd -;".format(CONST.dir_refstack_client))
+ "cd -;".format(CONST.__getattribute__('dir_refstack_client')))
ft_utils.execute_command(cmd)
def run_defcore(self, conf, testlist):
@@ -49,7 +53,7 @@ class RefstackClient(testcase.TestCase):
cmd = ("cd {0};"
"./refstack-client test -c {1} -v --test-list {2};"
- "cd -;".format(CONST.dir_refstack_client,
+ "cd -;".format(CONST.__getattribute__('dir_refstack_client'),
conf,
testlist))
ft_utils.execute_command(cmd)
@@ -59,16 +63,16 @@ class RefstackClient(testcase.TestCase):
cmd = ("cd {0};"
"./refstack-client test -c {1} -v --test-list {2};"
- "cd -;".format(CONST.dir_refstack_client,
+ "cd -;".format(CONST.__getattribute__('dir_refstack_client'),
self.confpath,
self.defcorelist))
logger.info("Starting Refstack_defcore test case: '%s'." % cmd)
header = ("Refstack environment:\n"
" SUT: %s\n Scenario: %s\n Node: %s\n Date: %s\n" %
- (CONST.INSTALLER_TYPE,
- CONST.DEPLOY_SCENARIO,
- CONST.NODE_NAME,
+ (CONST.__getattribute__('INSTALLER_TYPE'),
+ CONST.__getattribute__('DEPLOY_SCENARIO'),
+ CONST.__getattribute__('NODE_NAME'),
time.strftime("%a %b %d %H:%M:%S %Z %Y")))
f_stdout = open(
@@ -123,7 +127,11 @@ class RefstackClient(testcase.TestCase):
skipped_testcases += match + ", "
num_executed = int(num_tests) - int(num_skipped)
- success_rate = 100 * int(num_success) / int(num_executed)
+
+ try:
+ self.result = 100 * int(num_success) / int(num_executed)
+ except ZeroDivisionError:
+ logger.error("No test has been executed")
self.details = {"tests": int(num_tests),
"failures": int(num_failures),
@@ -131,12 +139,10 @@ class RefstackClient(testcase.TestCase):
"errors": failed_testcases,
"skipped": skipped_testcases}
except Exception:
- success_rate = 0
+ self.result = 0
- self.result = ft_utils.check_success_rate(
- self.case_name, success_rate)
- logger.info("Testcase %s success_rate is %s%%, is marked as %s"
- % (self.case_name, success_rate, self.result))
+ logger.info("Testcase %s success_rate is %s%%"
+ % (self.case_name, self.result))
def run(self):
'''used for functest command line,
@@ -196,9 +202,9 @@ class RefstackClient(testcase.TestCase):
class RefstackClientParser(object):
def __init__(self):
- self.FUNCTEST_TEST = CONST.dir_functest_test
- self.CONF_PATH = CONST.refstack_tempest_conf_path
- self.DEFCORE_LIST = CONST.refstack_defcore_list
+ self.FUNCTEST_TEST = CONST.__getattribute__('dir_functest_test')
+ self.CONF_PATH = CONST.__getattribute__('refstack_tempest_conf_path')
+ self.DEFCORE_LIST = CONST.__getattribute__('refstack_defcore_list')
self.confpath = os.path.join(self.FUNCTEST_TEST,
self.CONF_PATH)
self.defcorelist = os.path.join(self.FUNCTEST_TEST,
diff --git a/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py b/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py
index 5c04253c..fbaad589 100755
--- a/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py
+++ b/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py
@@ -24,12 +24,14 @@ class TempestConf(object):
self.DEPLOYMENT_ID = conf_utils.get_verifier_deployment_id()
self.DEPLOYMENT_DIR = conf_utils.get_verifier_deployment_dir(
self.VERIFIER_ID, self.DEPLOYMENT_ID)
- self.confpath = os.path.join(CONST.dir_functest_test,
- CONST.refstack_tempest_conf_path)
+ self.confpath = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ CONST.__getattribute__('refstack_tempest_conf_path'))
def generate_tempestconf(self):
try:
- openstack_utils.source_credentials(CONST.openstack_creds)
+ openstack_utils.source_credentials(
+ CONST.__getattribute__('openstack_creds'))
img_flavor_dict = conf_utils.create_tempest_resources(
use_custom_images=True, use_custom_flavors=True)
conf_utils.configure_tempest_defcore(
diff --git a/functest/opnfv_tests/openstack/snaps/health_check.py b/functest/opnfv_tests/openstack/snaps/health_check.py
index c057eb2b..0daddcdd 100644
--- a/functest/opnfv_tests/openstack/snaps/health_check.py
+++ b/functest/opnfv_tests/openstack/snaps/health_check.py
@@ -23,14 +23,15 @@ class HealthCheck(SnapsTestRunner):
"""
def __init__(self, **kwargs):
if "case_name" not in kwargs:
- kwargs["case_name"] = "snaps_health_check"
+ kwargs["case_name"] = "snaps_images_cirros"
super(HealthCheck, self).__init__(**kwargs)
self.suite = unittest.TestSuite()
image_custom_config = None
- if hasattr(CONST, 'snaps_health_check'):
- image_custom_config = CONST.__getattribute__('snaps_health_check')
+
+ if hasattr(CONST, 'snaps_images_cirros'):
+ image_custom_config = CONST.__getattribute__('snaps_images_cirros')
self.suite.addTest(
OSIntegrationTestCase.parameterize(
SimpleHealthCheck, os_creds=self.os_creds,
diff --git a/functest/opnfv_tests/openstack/snaps/smoke.py b/functest/opnfv_tests/openstack/snaps/smoke.py
index 2c6fc255..d9f95e90 100644
--- a/functest/opnfv_tests/openstack/snaps/smoke.py
+++ b/functest/opnfv_tests/openstack/snaps/smoke.py
@@ -28,11 +28,9 @@ class SnapsSmoke(SnapsTestRunner):
self.suite = unittest.TestSuite()
- # The snaps smoke test uses the same config as the
- # snaps_health_check suite, so reuse it here
- image_custom_config = None
- if hasattr(CONST, 'snaps_health_check'):
- image_custom_config = CONST.__getattribute__('snaps_health_check')
+ image_config = None
+ if hasattr(CONST, 'snaps_images_cirros'):
+ image_config = CONST.__getattribute__('snaps_images_cirros')
# Tests requiring floating IPs leverage files contained within the
# SNAPS repository and are found relative to that path
@@ -47,5 +45,5 @@ class SnapsSmoke(SnapsTestRunner):
ext_net_name=self.ext_net_name,
use_keystone=self.use_keystone,
flavor_metadata=self.flavor_metadata,
- image_metadata=image_custom_config,
+ image_metadata=image_config,
use_floating_ips=self.use_fip)
diff --git a/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py b/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
index 2a1b3a39..94b97551 100644
--- a/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
+++ b/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
@@ -18,9 +18,7 @@ from snaps.openstack.tests import openstack_tests
class SnapsTestRunner(PyTestSuiteRunner):
"""
- This test executes the SNAPS Python Test case SimpleHealthCheck which
- creates a VM with a single port with an IPv4 address that is assigned by
- DHCP. This test then validates the expected IP with the actual
+ This test executes the SNAPS Python Tests
"""
def __init__(self, **kwargs):
super(SnapsTestRunner, self).__init__(**kwargs)
diff --git a/functest/opnfv_tests/openstack/tempest/conf_utils.py b/functest/opnfv_tests/openstack/tempest/conf_utils.py
index 54f7428c..556a41d4 100644
--- a/functest/opnfv_tests/openstack/tempest/conf_utils.py
+++ b/functest/opnfv_tests/openstack/tempest/conf_utils.py
@@ -21,11 +21,12 @@ import functest.utils.openstack_utils as os_utils
IMAGE_ID_ALT = None
FLAVOR_ID_ALT = None
-REPO_PATH = CONST.dir_repo_functest
-GLANCE_IMAGE_PATH = os.path.join(CONST.dir_functest_data,
- CONST.openstack_image_file_name)
-TEMPEST_TEST_LIST_DIR = CONST.dir_tempest_cases
-TEMPEST_RESULTS_DIR = os.path.join(CONST.dir_results,
+REPO_PATH = CONST.__getattribute__('dir_repo_functest')
+GLANCE_IMAGE_PATH = os.path.join(
+ CONST.__getattribute__('dir_functest_images'),
+ CONST.__getattribute__('openstack_image_file_name'))
+TEMPEST_TEST_LIST_DIR = CONST.__getattribute__('dir_tempest_cases')
+TEMPEST_RESULTS_DIR = os.path.join(CONST.__getattribute__('dir_results'),
'tempest')
TEMPEST_CUSTOM = os.path.join(REPO_PATH, TEMPEST_TEST_LIST_DIR,
'test_list.txt')
@@ -35,11 +36,11 @@ TEMPEST_DEFCORE = os.path.join(REPO_PATH, TEMPEST_TEST_LIST_DIR,
'defcore_req.txt')
TEMPEST_RAW_LIST = os.path.join(TEMPEST_RESULTS_DIR, 'test_raw_list.txt')
TEMPEST_LIST = os.path.join(TEMPEST_RESULTS_DIR, 'test_list.txt')
-REFSTACK_RESULTS_DIR = os.path.join(CONST.dir_results,
+REFSTACK_RESULTS_DIR = os.path.join(CONST.__getattribute__('dir_results'),
'refstack')
-CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
-CI_INSTALLER_IP = CONST.INSTALLER_IP
+CI_INSTALLER_TYPE = CONST.__getattribute__('INSTALLER_TYPE')
+CI_INSTALLER_IP = CONST.__getattribute__('INSTALLER_IP')
""" logging configuration """
logger = logging.getLogger(__name__)
@@ -52,26 +53,27 @@ def create_tempest_resources(use_custom_images=False,
logger.debug("Creating tenant and user for Tempest suite")
tenant_id = os_utils.create_tenant(
keystone_client,
- CONST.tempest_identity_tenant_name,
- CONST.tempest_identity_tenant_description)
+ CONST.__getattribute__('tempest_identity_tenant_name'),
+ CONST.__getattribute__('tempest_identity_tenant_description'))
if not tenant_id:
logger.error("Failed to create %s tenant"
- % CONST.tempest_identity_tenant_name)
+ % CONST.__getattribute__('tempest_identity_tenant_name'))
- user_id = os_utils.create_user(keystone_client,
- CONST.tempest_identity_user_name,
- CONST.tempest_identity_user_password,
- None, tenant_id)
+ user_id = os_utils.create_user(
+ keystone_client,
+ CONST.__getattribute__('tempest_identity_user_name'),
+ CONST.__getattribute__('tempest_identity_user_password'),
+ None, tenant_id)
if not user_id:
logger.error("Failed to create %s user" %
- CONST.tempest_identity_user_name)
+ CONST.__getattribute__('tempest_identity_user_name'))
logger.debug("Creating private network for Tempest suite")
network_dic = os_utils.create_shared_network_full(
- CONST.tempest_private_net_name,
- CONST.tempest_private_subnet_name,
- CONST.tempest_router_name,
- CONST.tempest_private_subnet_cidr)
+ CONST.__getattribute__('tempest_private_net_name'),
+ CONST.__getattribute__('tempest_private_subnet_name'),
+ CONST.__getattribute__('tempest_router_name'),
+ CONST.__getattribute__('tempest_private_subnet_cidr'))
if network_dic is None:
raise Exception('Failed to create private network')
@@ -80,41 +82,45 @@ def create_tempest_resources(use_custom_images=False,
flavor_id = ""
flavor_id_alt = ""
- if CONST.tempest_use_custom_images or use_custom_images:
+ if (CONST.__getattribute__('tempest_use_custom_images') or
+ use_custom_images):
# adding alternative image should be trivial should we need it
logger.debug("Creating image for Tempest suite")
_, image_id = os_utils.get_or_create_image(
- CONST.openstack_image_name, GLANCE_IMAGE_PATH,
- CONST.openstack_image_disk_format)
+ CONST.__getattribute__('openstack_image_name'),
+ GLANCE_IMAGE_PATH,
+ CONST.__getattribute__('openstack_image_disk_format'))
if image_id is None:
raise Exception('Failed to create image')
if use_custom_images:
logger.debug("Creating 2nd image for Tempest suite")
_, image_id_alt = os_utils.get_or_create_image(
- CONST.openstack_image_name_alt, GLANCE_IMAGE_PATH,
- CONST.openstack_image_disk_format)
+ CONST.__getattribute__('openstack_image_name_alt'),
+ GLANCE_IMAGE_PATH,
+ CONST.__getattribute__('openstack_image_disk_format'))
if image_id_alt is None:
raise Exception('Failed to create image')
- if CONST.tempest_use_custom_flavors or use_custom_flavors:
+ if (CONST.__getattribute__('tempest_use_custom_flavors') or
+ use_custom_flavors):
# adding alternative flavor should be trivial should we need it
logger.debug("Creating flavor for Tempest suite")
_, flavor_id = os_utils.get_or_create_flavor(
- CONST.openstack_flavor_name,
- CONST.openstack_flavor_ram,
- CONST.openstack_flavor_disk,
- CONST.openstack_flavor_vcpus)
+ CONST.__getattribute__('openstack_flavor_name'),
+ CONST.__getattribute__('openstack_flavor_ram'),
+ CONST.__getattribute__('openstack_flavor_disk'),
+ CONST.__getattribute__('openstack_flavor_vcpus'))
if flavor_id is None:
raise Exception('Failed to create flavor')
if use_custom_flavors:
logger.debug("Creating 2nd flavor for tempest_defcore")
_, flavor_id_alt = os_utils.get_or_create_flavor(
- CONST.openstack_flavor_name_alt,
- CONST.openstack_flavor_ram,
- CONST.openstack_flavor_disk,
- CONST.openstack_flavor_vcpus)
+ CONST.__getattribute__('openstack_flavor_name_alt'),
+ CONST.__getattribute__('openstack_flavor_ram'),
+ CONST.__getattribute__('openstack_flavor_disk'),
+ CONST.__getattribute__('openstack_flavor_vcpus'))
if flavor_id_alt is None:
raise Exception('Failed to create flavor')
@@ -132,7 +138,7 @@ def get_verifier_id():
Returns verifer id for current Tempest
"""
cmd = ("rally verify list-verifiers | awk '/" +
- CONST.tempest_deployment_name +
+ CONST.__getattribute__('tempest_deployment_name') +
"/ {print $2}'")
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
@@ -149,7 +155,7 @@ def get_verifier_deployment_id():
Returns deployment id for active Rally deployment
"""
cmd = ("rally deployment list | awk '/" +
- CONST.rally_deployment_name +
+ CONST.__getattribute__('rally_deployment_name') +
"/ {print $2}'")
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
@@ -168,7 +174,7 @@ def get_verifier_repo_dir(verifier_id):
if not verifier_id:
verifier_id = get_verifier_id()
- return os.path.join(CONST.dir_rally_inst,
+ return os.path.join(CONST.__getattribute__('dir_rally_inst'),
'verification',
'verifier-{}'.format(verifier_id),
'repo')
@@ -184,7 +190,7 @@ def get_verifier_deployment_dir(verifier_id, deployment_id):
if not deployment_id:
deployment_id = get_verifier_deployment_id()
- return os.path.join(CONST.dir_rally_inst,
+ return os.path.join(CONST.__getattribute__('dir_rally_inst'),
'verification',
'verifier-{}'.format(verifier_id),
'for-deployment-{}'.format(deployment_id))
@@ -247,8 +253,9 @@ def configure_tempest_defcore(deployment_dir, img_flavor_dict):
with open(conf_file, 'wb') as config_file:
config.write(config_file)
- confpath = os.path.join(CONST.dir_functest_test,
- CONST.refstack_tempest_conf_path)
+ confpath = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ CONST.__getattribute__('refstack_tempest_conf_path'))
shutil.copyfile(conf_file, confpath)
@@ -263,32 +270,37 @@ def configure_tempest_update_params(tempest_conf_file,
config.set(
'compute',
'fixed_network_name',
- CONST.tempest_private_net_name)
+ CONST.__getattribute__('tempest_private_net_name'))
config.set('compute', 'volume_device_name',
- CONST.tempest_volume_device_name)
- if CONST.tempest_use_custom_images:
+ CONST.__getattribute__('tempest_volume_device_name'))
+ if CONST.__getattribute__('tempest_use_custom_images'):
if IMAGE_ID is not None:
config.set('compute', 'image_ref', IMAGE_ID)
if IMAGE_ID_ALT is not None:
config.set('compute', 'image_ref_alt', IMAGE_ID_ALT)
- if CONST.tempest_use_custom_flavors:
+ if CONST.__getattribute__('tempest_use_custom_flavors'):
if FLAVOR_ID is not None:
config.set('compute', 'flavor_ref', FLAVOR_ID)
if FLAVOR_ID_ALT is not None:
config.set('compute', 'flavor_ref_alt', FLAVOR_ID_ALT)
- config.set('identity', 'tenant_name', CONST.tempest_identity_tenant_name)
- config.set('identity', 'username', CONST.tempest_identity_user_name)
- config.set('identity', 'password', CONST.tempest_identity_user_password)
+ config.set('identity', 'tenant_name',
+ CONST.__getattribute__('tempest_identity_tenant_name'))
+ config.set('identity', 'username',
+ CONST.__getattribute__('tempest_identity_user_name'))
+ config.set('identity', 'password',
+ CONST.__getattribute__('tempest_identity_user_password'))
config.set('identity', 'region', 'RegionOne')
config.set(
- 'validation', 'ssh_timeout', CONST.tempest_validation_ssh_timeout)
+ 'validation', 'ssh_timeout',
+ CONST.__getattribute__('tempest_validation_ssh_timeout'))
config.set('object-storage', 'operator_role',
- CONST.tempest_object_storage_operator_role)
+ CONST.__getattribute__('tempest_object_storage_operator_role'))
- if CONST.OS_ENDPOINT_TYPE is not None:
+ if CONST.__getattribute__('OS_ENDPOINT_TYPE') is not None:
sections = config.sections()
if os_utils.is_keystone_v3():
- config.set('identity', 'v3_endpoint_type', CONST.OS_ENDPOINT_TYPE)
+ config.set('identity', 'v3_endpoint_type',
+ CONST.__getattribute__('OS_ENDPOINT_TYPE'))
if 'identity-feature-enabled' not in sections:
config.add_section('identity-feature-enabled')
config.set('identity-feature-enabled', 'api_v2', False)
@@ -304,7 +316,7 @@ def configure_tempest_update_params(tempest_conf_file,
if service not in sections:
config.add_section(service)
config.set(service, 'endpoint_type',
- CONST.OS_ENDPOINT_TYPE)
+ CONST.__getattribute__('OS_ENDPOINT_TYPE'))
with open(tempest_conf_file, 'wb') as config_file:
config.write(config_file)
@@ -365,22 +377,22 @@ def configure_tempest_multisite_params(tempest_conf_file):
"StrictHostKeyChecking=no")
# Get the controller IP from the fuel node
- cmd = 'sshpass -p %s ssh 2>/dev/null %s %s@%s \
- \'fuel node --env 1| grep controller | grep "True\| 1" \
- | awk -F\| "{print \$5}"\'' % (installer_password,
+ cmd = ('sshpass -p %s ssh 2>/dev/null %s %s@%s '
+ '\'fuel node --env 1| grep controller | grep "True\| 1" '
+ '| awk -F\| "{print \$5}"\'' % (installer_password,
ssh_options,
installer_username,
- installer_ip)
+ installer_ip))
multisite_controller_ip = "".join(os.popen(cmd).read().split())
# Login to controller and get bind host details
- cmd = 'sshpass -p %s ssh 2>/dev/null %s %s@%s "ssh %s \\" \
- grep -e "^bind_" %s \\""' % (installer_password,
- ssh_options,
- installer_username,
- installer_ip,
- multisite_controller_ip,
- kingbird_conf_path)
+ cmd = ('sshpass -p %s ssh 2>/dev/null %s %s@%s "ssh %s \\" '
+ 'grep -e "^bind_" %s \\""' % (installer_password,
+ ssh_options,
+ installer_username,
+ installer_ip,
+ multisite_controller_ip,
+ kingbird_conf_path))
bind_details = os.popen(cmd).read()
bind_details = "".join(bind_details.split())
# Extract port number from the bind details
diff --git a/functest/opnfv_tests/openstack/tempest/tempest.py b/functest/opnfv_tests/openstack/tempest/tempest.py
index 984e2a1b..233ceb48 100644
--- a/functest/opnfv_tests/openstack/tempest/tempest.py
+++ b/functest/opnfv_tests/openstack/tempest/tempest.py
@@ -8,6 +8,8 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
+from __future__ import division
+
import logging
import os
import re
@@ -79,8 +81,8 @@ class TempestCommon(testcase.TestCase):
result_file = open(conf_utils.TEMPEST_LIST, 'w')
black_tests = []
try:
- installer_type = CONST.INSTALLER_TYPE
- deploy_scenario = CONST.DEPLOY_SCENARIO
+ installer_type = CONST.__getattribute__('INSTALLER_TYPE')
+ deploy_scenario = CONST.__getattribute__('DEPLOY_SCENARIO')
if (bool(installer_type) * bool(deploy_scenario)):
# if INSTALLER_TYPE and DEPLOY_SCENARIO are set we read the
# file
@@ -117,9 +119,9 @@ class TempestCommon(testcase.TestCase):
header = ("Tempest environment:\n"
" SUT: %s\n Scenario: %s\n Node: %s\n Date: %s\n" %
- (CONST.INSTALLER_TYPE,
- CONST.DEPLOY_SCENARIO,
- CONST.NODE_NAME,
+ (CONST.__getattribute__('INSTALLER_TYPE'),
+ CONST.__getattribute__('DEPLOY_SCENARIO'),
+ CONST.__getattribute__('NODE_NAME'),
time.strftime("%a %b %d %H:%M:%S %Z %Y")))
f_stdout = open(
@@ -181,7 +183,13 @@ class TempestCommon(testcase.TestCase):
try:
num_executed = int(num_tests) - int(num_skipped)
- success_rate = 100 * int(num_success) / int(num_executed)
+ try:
+ self.result = 100 * int(num_success) / int(num_executed)
+ except ZeroDivisionError:
+ logger.error("No test has been executed")
+ self.result = 0
+ return
+
with open(os.path.join(conf_utils.TEMPEST_RESULTS_DIR,
"tempest.log"), 'r') as logfile:
output = logfile.read()
@@ -198,12 +206,10 @@ class TempestCommon(testcase.TestCase):
"errors": error_logs,
"skipped": skipped_testcase}
except Exception:
- success_rate = 0
+ self.result = 0
- self.result = ft_utils.check_success_rate(
- self.case_name, success_rate)
- logger.info("Tempest %s success_rate is %s%%, is marked as %s"
- % (self.case_name, success_rate, self.result))
+ logger.info("Tempest %s success_rate is %s%%"
+ % (self.case_name, self.result))
def run(self):
@@ -267,7 +273,8 @@ class TempestMultisite(TempestCommon):
TempestCommon.__init__(self, **kwargs)
self.MODE = "feature_multisite"
self.OPTION = "--concurrency 1"
- conf_utils.install_verifier_ext(CONST.dir_repo_kingbird)
+ conf_utils.install_verifier_ext(
+ CONST.__getattribute__('dir_repo_kingbird'))
class TempestCustom(TempestCommon):
diff --git a/functest/opnfv_tests/openstack/vping/vping_base.py b/functest/opnfv_tests/openstack/vping/vping_base.py
index 8bf263eb..8e71bf82 100644
--- a/functest/opnfv_tests/openstack/vping/vping_base.py
+++ b/functest/opnfv_tests/openstack/vping/vping_base.py
@@ -7,281 +7,194 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
+from datetime import datetime
+import logging
import os
-import pprint
import time
-from datetime import datetime
+import uuid
-import functest.core.testcase as testcase
-import functest.utils.openstack_utils as os_utils
+from functest.core.testcase import TestCase
+from functest.utils import functest_utils
from functest.utils.constants import CONST
+from snaps.openstack import create_flavor
+from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
+from snaps.openstack.create_network import NetworkSettings, SubnetSettings
+from snaps.openstack.tests import openstack_tests
+from snaps.openstack.utils import deploy_utils, nova_utils
+
+
+class VPingBase(TestCase):
+
+ """
+ Base class for vPing tests that check connectivity between two VMs shared
+ internal network.
+ This class is responsible for creating the image, internal network.
+ """
-class VPingBase(testcase.TestCase):
def __init__(self, **kwargs):
super(VPingBase, self).__init__(**kwargs)
- self.logger = None
- self.functest_repo = CONST.dir_repo_functest
- self.repo = CONST.dir_vping
- self.vm1_name = CONST.vping_vm_name_1
- self.vm2_name = CONST.vping_vm_name_2
- self.vm_boot_timeout = 180
- self.vm_delete_timeout = 100
- self.ping_timeout = CONST.vping_ping_timeout
-
- self.image_name = CONST.vping_image_name
- self.image_filename = CONST.openstack_image_file_name
- self.image_format = CONST.openstack_image_disk_format
- self.image_username = CONST.openstack_image_username
- self.image_password = CONST.openstack_image_password
- self.image_path = os.path.join(CONST.dir_functest_data,
- self.image_filename)
-
- self.flavor_name = CONST.vping_vm_flavor
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ self.functest_repo = CONST.__getattribute__('dir_repo_functest')
+ self.guid = ''
+ if CONST.__getattribute__('vping_unique_names'):
+ self.guid = '-' + str(uuid.uuid4())
+
+ self.os_creds = openstack_tests.get_credentials(
+ os_env_file=CONST.__getattribute__('openstack_creds'))
+
+ self.repo = CONST.__getattribute__('dir_vping')
+
+ self.creators = list()
+ self.image_creator = None
+ self.network_creator = None
+ self.vm1_creator = None
+ self.vm2_creator = None
+
+ self.self_cleanup = CONST.__getattribute__('vping_cleanup_objects')
+
+ # Image constants
+ self.image_name =\
+ CONST.__getattribute__('vping_image_name') + self.guid
+
+ # VM constants
+ self.vm1_name = CONST.__getattribute__('vping_vm_name_1') + self.guid
+ self.vm2_name = CONST.__getattribute__('vping_vm_name_2') + self.guid
+ self.vm_boot_timeout = CONST.__getattribute__('vping_vm_boot_timeout')
+ self.vm_delete_timeout =\
+ CONST.__getattribute__('vping_vm_delete_timeout')
+ self.vm_ssh_connect_timeout = CONST.vping_vm_ssh_connect_timeout
+ self.ping_timeout = CONST.__getattribute__('vping_ping_timeout')
+ self.flavor_name = 'vping-flavor' + self.guid
# NEUTRON Private Network parameters
- self.private_net_name = CONST.vping_private_net_name
- self.private_subnet_name = CONST.vping_private_subnet_name
- self.private_subnet_cidr = CONST.vping_private_subnet_cidr
- self.router_name = CONST.vping_router_name
- self.sg_name = CONST.vping_sg_name
- self.sg_desc = CONST.vping_sg_desc
- self.neutron_client = os_utils.get_neutron_client()
- self.glance_client = os_utils.get_glance_client()
- self.nova_client = os_utils.get_nova_client()
-
- def run(self, **kwargs):
- if not self.check_repo_exist():
- return testcase.TestCase.EX_RUN_ERROR
-
- image_id = self.create_image()
- if not image_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- flavor = self.get_flavor()
- if not flavor:
- return testcase.TestCase.EX_RUN_ERROR
-
- network_id = self.create_network_full()
- if not network_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- sg_id = self.create_security_group()
- if not sg_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- self.delete_exist_vms()
+ self.private_net_name =\
+ CONST.__getattribute__('vping_private_net_name') + self.guid
+ self.private_subnet_name =\
+ CONST.__getattribute__('vping_private_subnet_name') + self.guid
+ self.private_subnet_cidr =\
+ CONST.__getattribute__('vping_private_subnet_cidr')
- self.start_time = time.time()
- self.logger.info("vPing Start Time:'%s'" % (
- datetime.fromtimestamp(self.start_time).strftime(
- '%Y-%m-%d %H:%M:%S')))
+ scenario = functest_utils.get_scenario()
- vm1 = self.boot_vm(self.vm1_name,
- image_id,
- flavor,
- network_id,
- None,
- sg_id)
- if not vm1:
- return testcase.TestCase.EX_RUN_ERROR
-
- test_ip = self.get_test_ip(vm1)
- vm2 = self.boot_vm(self.vm2_name,
- image_id,
- flavor,
- network_id,
- test_ip,
- sg_id)
- if not vm2:
- return testcase.TestCase.EX_RUN_ERROR
-
- EXIT_CODE = self.do_vping(vm2, test_ip)
- if EXIT_CODE == testcase.TestCase.EX_RUN_ERROR:
- return EXIT_CODE
+ self.flavor_metadata = create_flavor.MEM_PAGE_SIZE_ANY
+ if 'ovs' in scenario or 'fdio' in scenario:
+ self.flavor_metadata = create_flavor.MEM_PAGE_SIZE_LARGE
- self.stop_time = time.time()
- self.parse_result(EXIT_CODE,
- self.start_time,
- self.stop_time)
- return testcase.TestCase.EX_OK
+ self.cirros_image_config = None
- def boot_vm_preparation(self, config, vmname, test_ip):
- pass
+ # Move this configuration option up for all tests to leverage
+ if hasattr(CONST, 'snaps_images_cirros'):
+ self.cirros_image_config = CONST.__getattribute__(
+ 'snaps_images_cirros')
- def do_vping(self, vm, test_ip):
- raise NotImplementedError('vping execution is not implemented')
+ def run(self):
+ """
+ Begins the test execution which should originate from the subclass
+ """
- def check_repo_exist(self):
if not os.path.exists(self.functest_repo):
- self.logger.error("Functest repository not found '%s'"
- % self.functest_repo)
- return False
- return True
+ raise Exception(
+ "Functest repository not found '%s'" % self.functest_repo)
- def create_image(self):
- _, image_id = os_utils.get_or_create_image(self.image_name,
- self.image_path,
- self.image_format)
- if not image_id:
- return None
+ self.logger.info('Begin virtual environment setup')
- return image_id
+ self.start_time = time.time()
+ self.logger.info("vPing Start Time:'%s'" % (
+ datetime.fromtimestamp(self.start_time).strftime(
+ '%Y-%m-%d %H:%M:%S')))
- def get_flavor(self):
- try:
- flavor = self.nova_client.flavors.find(name=self.flavor_name)
- self.logger.info("Using existing Flavor '%s'..."
- % self.flavor_name)
- return flavor
- except:
- self.logger.error("Flavor '%s' not found." % self.flavor_name)
- self.logger.info("Available flavors are: ")
- self.pMsg(self.nova_client.flavor.list())
- return None
-
- def create_network_full(self):
- network_dic = os_utils.create_network_full(self.neutron_client,
- self.private_net_name,
- self.private_subnet_name,
- self.router_name,
- self.private_subnet_cidr)
-
- if not network_dic:
- self.logger.error(
- "There has been a problem when creating the neutron network")
- return None
- network_id = network_dic["net_id"]
- return network_id
-
- def create_security_group(self):
- sg_id = os_utils.get_security_group_id(self.neutron_client,
- self.sg_name)
- if sg_id != '':
- self.logger.info("Using existing security group '%s'..."
- % self.sg_name)
+ self.__delete_exist_vms()
+
+ image_base_name = self.image_name + '-' + str(self.guid)
+ os_image_settings = openstack_tests.cirros_image_settings(
+ image_base_name, image_metadata=self.cirros_image_config)
+ self.logger.info("Creating image with name: '%s'" % self.image_name)
+
+ self.image_creator = deploy_utils.create_image(
+ self.os_creds, os_image_settings)
+ self.creators.append(self.image_creator)
+
+ self.logger.info(
+ "Creating network with name: '%s'" % self.private_net_name)
+ self.network_creator = deploy_utils.create_network(
+ self.os_creds,
+ NetworkSettings(name=self.private_net_name,
+ subnet_settings=[SubnetSettings(
+ name=self.private_subnet_name,
+ cidr=self.private_subnet_cidr)]))
+ self.creators.append(self.network_creator)
+
+ self.logger.info(
+ "Creating flavor with name: '%s'" % self.flavor_name)
+ flavor_creator = OpenStackFlavor(
+ self.os_creds,
+ FlavorSettings(name=self.flavor_name, ram=512, disk=1, vcpus=1,
+ metadata=self.flavor_metadata))
+ flavor_creator.create()
+ self.creators.append(flavor_creator)
+
+ def _execute(self):
+ """
+ Method called by subclasses after environment has been setup
+ :return: the exit code
+ """
+ self.logger.info('Begin test execution')
+
+ test_ip = self.vm1_creator.get_port_ip(
+ self.vm1_creator.instance_settings.port_settings[0].name)
+
+ if self.vm1_creator.vm_active(
+ block=True) and self.vm2_creator.vm_active(block=True):
+ result = self._do_vping(self.vm2_creator, test_ip)
else:
- self.logger.info("Creating security group '%s'..."
- % self.sg_name)
- SECGROUP = os_utils.create_security_group(self.neutron_client,
- self.sg_name,
- self.sg_desc)
- if not SECGROUP:
- self.logger.error("Failed to create the security group...")
- return None
-
- sg_id = SECGROUP['id']
-
- self.logger.debug("Security group '%s' with ID=%s created "
- "successfully." % (SECGROUP['name'], sg_id))
-
- self.logger.debug("Adding ICMP rules in security group '%s'..."
- % self.sg_name)
- if not os_utils.create_secgroup_rule(self.neutron_client, sg_id,
- 'ingress', 'icmp'):
- self.logger.error("Failed to create security group rule...")
- return None
-
- self.logger.debug("Adding SSH rules in security group '%s'..."
- % self.sg_name)
- if not os_utils.create_secgroup_rule(self.neutron_client, sg_id,
- 'ingress', 'tcp',
- '22', '22'):
- self.logger.error("Failed to create security group rule...")
- return None
-
- if not os_utils.create_secgroup_rule(
- self.neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
- self.logger.error("Failed to create security group rule...")
- return None
- return sg_id
-
- def delete_exist_vms(self):
- servers = self.nova_client.servers.list()
+ raise Exception('VMs never became active')
+
+ if result == TestCase.EX_RUN_ERROR:
+ return TestCase.EX_RUN_ERROR
+
+ self.stop_time = time.time()
+ self.result = 100
+ return TestCase.EX_OK
+
+ def _cleanup(self):
+ """
+ Cleanup all OpenStack objects. Should be called on completion
+ :return:
+ """
+ if self.self_cleanup:
+ for creator in reversed(self.creators):
+ try:
+ creator.clean()
+ except Exception as e:
+ self.logger.error('Unexpected error cleaning - %s', e)
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Method to be implemented by subclasses
+ Begins the real test after the OpenStack environment has been setup
+ :param vm_creator: the SNAPS VM instance creator object
+ :param test_ip: the IP to which the VM needs to issue the ping
+ :return: T/F
+ """
+ raise NotImplementedError('vping execution is not implemented')
+
+ def __delete_exist_vms(self):
+ """
+ Cleans any existing VMs using the same name.
+ """
+ nova_client = nova_utils.nova_client(self.os_creds)
+ servers = nova_client.servers.list()
for server in servers:
if server.name == self.vm1_name or server.name == self.vm2_name:
self.logger.info("Deleting instance %s..." % server.name)
server.delete()
- def boot_vm(self, vmname, image_id, flavor, network_id, test_ip, sg_id):
- config = dict()
- config['name'] = vmname
- config['flavor'] = flavor
- config['image'] = image_id
- config['nics'] = [{"net-id": network_id}]
- self.boot_vm_preparation(config, vmname, test_ip)
- self.logger.info("Creating instance '%s'..." % vmname)
- self.logger.debug("Configuration: %s" % config)
- vm = self.nova_client.servers.create(**config)
-
- # wait until VM status is active
- if not self.waitVmActive(self.nova_client, vm):
- vm_status = os_utils.get_instance_status(self.nova_client, vm)
- self.logger.error("Instance '%s' cannot be booted. Status is '%s'"
- % (vmname, vm_status))
- return None
- else:
- self.logger.info("Instance '%s' is ACTIVE." % vmname)
-
- self.add_secgroup(vmname, vm.id, sg_id)
-
- return vm
-
- def waitVmActive(self, nova, vm):
- # sleep and wait for VM status change
- sleep_time = 3
- count = self.vm_boot_timeout / sleep_time
- while True:
- status = os_utils.get_instance_status(nova, vm)
- self.logger.debug("Status: %s" % status)
- if status == "ACTIVE":
- return True
- if status == "ERROR" or status == "error":
- return False
- if count == 0:
- self.logger.debug("Booting a VM timed out...")
- return False
- count -= 1
- time.sleep(sleep_time)
- return False
-
- def add_secgroup(self, vmname, vm_id, sg_id):
- self.logger.info("Adding '%s' to security group '%s'..." %
- (vmname, self.sg_name))
- os_utils.add_secgroup_to_instance(self.nova_client, vm_id, sg_id)
-
- def get_test_ip(self, vm):
- test_ip = vm.networks.get(self.private_net_name)[0]
- self.logger.debug("Instance '%s' got %s" % (vm.name, test_ip))
- return test_ip
-
- def parse_result(self, code, start_time, stop_time):
- test_status = "FAIL"
- if code == 0:
- self.logger.info("vPing OK")
- duration = round(stop_time - start_time, 1)
- self.logger.info("vPing duration:'%s'" % duration)
- test_status = "PASS"
- elif code == -2:
- duration = 0
- self.logger.info("Userdata is not supported in nova boot. "
- "Aborting test...")
- else:
- duration = 0
- self.logger.error("vPing FAILED")
-
- self.details = {'timestart': start_time,
- 'duration': duration,
- 'status': test_status}
- self.result = test_status
-
- @staticmethod
- def pMsg(msg):
- """pretty printing"""
- pprint.PrettyPrinter(indent=4).pprint(msg)
-
class VPingMain(object):
+
def __init__(self, vping_cls):
self.vping = vping_cls()
@@ -291,6 +204,6 @@ class VPingMain(object):
if result != VPingBase.EX_OK:
return result
if kwargs['report']:
- return self.vping.push_to_db()
- except Exception:
+ return self.vping.publish_report()
+ except:
return VPingBase.EX_RUN_ERROR
diff --git a/functest/opnfv_tests/openstack/vping/vping_ssh.py b/functest/opnfv_tests/openstack/vping/vping_ssh.py
index e87da363..0ad77402 100755
--- a/functest/opnfv_tests/openstack/vping/vping_ssh.py
+++ b/functest/opnfv_tests/openstack/vping/vping_ssh.py
@@ -7,124 +7,161 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import logging
+import argparse
import os
-import re
+from scp import SCPClient
import sys
import time
-import argparse
-import paramiko
-from scp import SCPClient
-
-import functest.utils.openstack_utils as os_utils
+from snaps.openstack.create_instance import FloatingIpSettings, \
+ VmInstanceSettings
+from snaps.openstack.create_keypairs import KeypairSettings
+from snaps.openstack.create_network import PortSettings
+from snaps.openstack.create_router import RouterSettings
+from snaps.openstack.create_security_group import Direction, Protocol, \
+ SecurityGroupSettings, SecurityGroupRuleSettings
+from snaps.openstack.utils import deploy_utils
+
+from functest.core.testcase import TestCase
+from functest.opnfv_tests.openstack.snaps import snaps_utils
+from functest.utils.constants import CONST
import vping_base
-import functest.core.testcase as testcase
class VPingSSH(vping_base.VPingBase):
+ """
+ Class to execute the vPing test using a Floating IP to connect to one VM
+ to issue the ping command to the second
+ """
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_ssh"
super(VPingSSH, self).__init__(**kwargs)
- self.logger = logging.getLogger(__name__)
-
- def do_vping(self, vm, test_ip):
- floatip = self.add_float_ip(vm)
- if not floatip:
- return testcase.TestCase.EX_RUN_ERROR
- ssh = self.establish_ssh(vm, floatip)
- if not ssh:
- return testcase.TestCase.EX_RUN_ERROR
- if not self.transfer_ping_script(ssh, floatip):
- return testcase.TestCase.EX_RUN_ERROR
- return self.do_vping_ssh(ssh, test_ip)
-
- def add_float_ip(self, vm):
- self.logger.info("Creating floating IP for VM '%s'..." % self.vm2_name)
- floatip_dic = os_utils.create_floating_ip(self.neutron_client)
- floatip = floatip_dic['fip_addr']
-
- if floatip is None:
- self.logger.error("Cannot create floating IP.")
- return None
- self.logger.info("Floating IP created: '%s'" % floatip)
-
- self.logger.info("Associating floating ip: '%s' to VM '%s' "
- % (floatip, self.vm2_name))
- if not os_utils.add_floating_ip(self.nova_client, vm.id, floatip):
- self.logger.error("Cannot associate floating IP to VM.")
- return None
-
- return floatip
-
- def establish_ssh(self, vm, floatip):
- self.logger.info("Trying to establish SSH connection to %s..."
- % floatip)
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- timeout = 50
- nolease = False
- got_ip = False
- discover_count = 0
- cidr_first_octet = self.private_subnet_cidr.split('.')[0]
- while timeout > 0:
- try:
- ssh.connect(floatip, username=self.image_username,
- password=self.image_password, timeout=2)
- self.logger.debug("SSH connection established to %s."
- % floatip)
- break
- except:
- self.logger.debug("Waiting for %s..." % floatip)
- time.sleep(6)
- timeout -= 1
-
- console_log = vm.get_console_output()
-
- # print each "Sending discover" captured on the console log
- if (len(re.findall("Sending discover", console_log)) >
- discover_count and not got_ip):
- discover_count += 1
- self.logger.debug("Console-log '%s': Sending discover..."
- % self.vm2_name)
-
- # check if eth0 got an ip,the line looks like this:
- # "inet addr:192.168."....
- # if the dhcp agent fails to assing ip, this line will not appear
- if "inet addr:" + cidr_first_octet in console_log and not got_ip:
- got_ip = True
- self.logger.debug("The instance '%s' succeeded to get the IP "
- "from the dhcp agent." % self.vm2_name)
-
- # if dhcp not work,it shows "No lease, failing".The test will fail
- if ("No lease, failing" in console_log and
- not nolease and
- not got_ip):
- nolease = True
- self.logger.debug("Console-log '%s': No lease, failing..."
- % self.vm2_name)
- self.logger.info("The instance failed to get an IP from DHCP "
- "agent. The test will probably timeout...")
-
- if timeout == 0: # 300 sec timeout (5 min)
- self.logger.error("Cannot establish connection to IP '%s'. "
- "Aborting" % floatip)
- return None
- return ssh
-
- def transfer_ping_script(self, ssh, floatip):
- self.logger.info("Trying to transfer ping.sh to %s..." % floatip)
+
+ self.kp_name = CONST.__getattribute__('vping_keypair_name') + self.guid
+ self.kp_priv_file = CONST.__getattribute__('vping_keypair_priv_file')
+ self.kp_pub_file = CONST.__getattribute__('vping_keypair_pub_file')
+ self.router_name =\
+ CONST.__getattribute__('vping_router_name') + self.guid
+ self.sg_name = CONST.__getattribute__('vping_sg_name') + self.guid
+ self.sg_desc = CONST.__getattribute__('vping_sg_desc')
+
+ self.ext_net_name = snaps_utils.get_ext_net_name(self.os_creds)
+
+ def run(self):
+ """
+ Sets up the OpenStack keypair, router, security group, and VM instance
+ objects then validates the ping.
+ :return: the exit code from the super.execute() method
+ """
+ try:
+ super(VPingSSH, self).run()
+
+ self.logger.info("Creating keypair with name: '%s'" % self.kp_name)
+ kp_creator = deploy_utils.create_keypair(
+ self.os_creds,
+ KeypairSettings(name=self.kp_name,
+ private_filepath=self.kp_priv_file,
+ public_filepath=self.kp_pub_file))
+ self.creators.append(kp_creator)
+
+ # Creating router to external network
+ self.logger.info("Creating router with name: '%s'"
+ % self.router_name)
+ net_set = self.network_creator.network_settings
+ sub_set = [net_set.subnet_settings[0].name]
+ router_creator = deploy_utils.create_router(
+ self.os_creds,
+ RouterSettings(
+ name=self.router_name,
+ external_gateway=self.ext_net_name,
+ internal_subnets=sub_set))
+ self.creators.append(router_creator)
+
+ # Creating Instance 1
+ port1_settings = PortSettings(
+ name=self.vm1_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance1_settings = VmInstanceSettings(
+ name=self.vm1_name, flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ vm_delete_timeout=self.vm_delete_timeout,
+ ssh_connect_timeout=self.vm_ssh_connect_timeout,
+ port_settings=[port1_settings])
+
+ self.logger.info(
+ "Creating VM 1 instance with name: '%s'"
+ % instance1_settings.name)
+ self.vm1_creator = deploy_utils.create_vm_instance(
+ self.os_creds,
+ instance1_settings,
+ self.image_creator.image_settings,
+ keypair_creator=kp_creator)
+ self.creators.append(self.vm1_creator)
+
+ # Creating Instance 2
+ sg_creator = self.__create_security_group()
+ self.creators.append(sg_creator)
+
+ port2_settings = PortSettings(
+ name=self.vm2_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance2_settings = VmInstanceSettings(
+ name=self.vm2_name, flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ vm_delete_timeout=self.vm_delete_timeout,
+ ssh_connect_timeout=self.vm_ssh_connect_timeout,
+ port_settings=[port2_settings],
+ security_group_names=[sg_creator.sec_grp_settings.name],
+ floating_ip_settings=[FloatingIpSettings(
+ name=self.vm2_name + '-FIPName',
+ port_name=port2_settings.name,
+ router_name=router_creator.router_settings.name)])
+
+ self.logger.info(
+ "Creating VM 2 instance with name: '%s'"
+ % instance2_settings.name)
+ self.vm2_creator = deploy_utils.create_vm_instance(
+ self.os_creds,
+ instance2_settings,
+ self.image_creator.image_settings,
+ keypair_creator=kp_creator)
+ self.creators.append(self.vm2_creator)
+
+ return self._execute()
+ except Exception as e:
+ self.logger.error('Unexpected error running test - ' + e.message)
+ return TestCase.EX_RUN_ERROR
+ finally:
+ self._cleanup()
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Override from super
+ """
+ if vm_creator.vm_ssh_active(block=True):
+ ssh = vm_creator.ssh_client()
+ if not self.__transfer_ping_script(ssh):
+ return TestCase.EX_RUN_ERROR
+ return self.__do_vping_ssh(ssh, test_ip)
+ else:
+ return -1
+
+ def __transfer_ping_script(self, ssh):
+ """
+ Uses SCP to copy the ping script via the SSH client
+ :param ssh: the SSH client
+ :return:
+ """
+ self.logger.info("Trying to transfer ping.sh")
scp = SCPClient(ssh.get_transport())
local_path = self.functest_repo + "/" + self.repo
ping_script = os.path.join(local_path, "ping.sh")
try:
scp.put(ping_script, "~/")
except:
- self.logger.error("Cannot SCP the file '%s' to VM '%s'"
- % (ping_script, floatip))
+ self.logger.error("Cannot SCP the file '%s'" % ping_script)
return False
cmd = 'chmod 755 ~/ping.sh'
@@ -134,8 +171,14 @@ class VPingSSH(vping_base.VPingBase):
return True
- def do_vping_ssh(self, ssh, test_ip):
- EXIT_CODE = -1
+ def __do_vping_ssh(self, ssh, test_ip):
+ """
+ Pings the test_ip via the SSH client
+ :param ssh: the SSH client used to issue the ping command
+ :param test_ip: the IP for the ping command to use
+ :return: exit_code (int)
+ """
+ exit_code = TestCase.EX_TESTCASE_FAILED
self.logger.info("Waiting for ping...")
sec = 0
@@ -150,7 +193,7 @@ class VPingSSH(vping_base.VPingBase):
for line in output:
if "vPing OK" in line:
self.logger.info("vPing detected!")
- EXIT_CODE = 0
+ exit_code = TestCase.EX_OK
flag = True
break
@@ -162,11 +205,38 @@ class VPingSSH(vping_base.VPingBase):
break
self.logger.debug("Pinging %s. Waiting for response..." % test_ip)
sec += 1
- return EXIT_CODE
+ return exit_code
+
+ def __create_security_group(self):
+ """
+ Configures and deploys an OpenStack security group object
+ :return: the creator object
+ """
+ sg_rules = list()
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.ingress,
+ protocol=Protocol.icmp))
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22,
+ port_range_max=22))
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.egress,
+ protocol=Protocol.tcp, port_range_min=22,
+ port_range_max=22))
+
+ self.logger.info("Security group with name: '%s'" % self.sg_name)
+ return deploy_utils.create_security_group(self.os_creds,
+ SecurityGroupSettings(
+ name=self.sg_name,
+ description=self.sg_desc,
+ rule_settings=sg_rules))
if __name__ == '__main__':
- logging.basicConfig()
args_parser = argparse.ArgumentParser()
args_parser.add_argument("-r", "--report",
help="Create json result file",
diff --git a/functest/opnfv_tests/openstack/vping/vping_userdata.py b/functest/opnfv_tests/openstack/vping/vping_userdata.py
index 05dda9de..8ea9be84 100755
--- a/functest/opnfv_tests/openstack/vping/vping_userdata.py
+++ b/functest/opnfv_tests/openstack/vping/vping_userdata.py
@@ -7,74 +7,144 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import logging
+import argparse
import sys
import time
-import argparse
+from functest.core.testcase import TestCase
+
+from snaps.openstack.utils import deploy_utils
+from snaps.openstack.create_instance import VmInstanceSettings
+from snaps.openstack.create_network import PortSettings
import vping_base
class VPingUserdata(vping_base.VPingBase):
+ """
+ Class to execute the vPing test using userdata and the VM's console
+ """
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_userdata"
super(VPingUserdata, self).__init__(**kwargs)
- self.logger = logging.getLogger(__name__)
-
- def boot_vm_preparation(self, config, vmname, test_ip):
- config['config_drive'] = True
- if vmname == self.vm2_name:
- u = ("#!/bin/sh\n\n"
- "while true; do\n"
- " ping -c 1 %s 2>&1 >/dev/null\n"
- " RES=$?\n"
- " if [ \"Z$RES\" = \"Z0\" ] ; then\n"
- " echo 'vPing OK'\n"
- " break\n"
- " else\n"
- " echo 'vPing KO'\n"
- " fi\n"
- " sleep 1\n"
- "done\n" % test_ip)
- config['userdata'] = u
-
- def do_vping(self, vm, test_ip):
+
+ def run(self):
+ """
+ Sets up the OpenStack VM instance objects then executes the ping and
+ validates.
+ :return: the exit code from the super.execute() method
+ """
+ try:
+ super(VPingUserdata, self).run()
+
+ # Creating Instance 1
+ port1_settings = PortSettings(
+ name=self.vm1_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance1_settings = VmInstanceSettings(
+ name=self.vm1_name,
+ flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ port_settings=[port1_settings])
+
+ self.logger.info(
+ "Creating VM 1 instance with name: '%s'"
+ % instance1_settings.name)
+ self.vm1_creator = deploy_utils.create_vm_instance(
+ self.os_creds, instance1_settings,
+ self.image_creator.image_settings)
+ self.creators.append(self.vm1_creator)
+
+ userdata = _get_userdata(
+ self.vm1_creator.get_port_ip(port1_settings.name))
+ if userdata:
+ # Creating Instance 2
+ port2_settings = PortSettings(
+ name=self.vm2_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance2_settings = VmInstanceSettings(
+ name=self.vm2_name,
+ flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ port_settings=[port2_settings],
+ userdata=userdata)
+
+ self.logger.info(
+ "Creating VM 2 instance with name: '%s'"
+ % instance2_settings.name)
+ self.vm2_creator = deploy_utils.create_vm_instance(
+ self.os_creds, instance2_settings,
+ self.image_creator.image_settings)
+ self.creators.append(self.vm2_creator)
+ else:
+ raise Exception('Userdata is None')
+
+ return self._execute()
+
+ finally:
+ self._cleanup()
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Override from super
+ """
self.logger.info("Waiting for ping...")
- EXIT_CODE = -1
+ exit_code = -1
sec = 0
tries = 0
while True:
time.sleep(1)
- p_console = vm.get_console_output()
+ p_console = vm_creator.get_vm_inst().get_console_output()
if "vPing OK" in p_console:
self.logger.info("vPing detected!")
- EXIT_CODE = 0
+ exit_code = TestCase.EX_OK
break
elif "failed to read iid from metadata" in p_console or tries > 5:
- EXIT_CODE = -2
+ exit_code = TestCase.EX_TESTCASE_FAILED
break
elif sec == self.ping_timeout:
self.logger.info("Timeout reached.")
break
elif sec % 10 == 0:
if "request failed" in p_console:
- self.logger.debug("It seems userdata is not supported "
- "in nova boot. Waiting a bit...")
+ self.logger.debug(
+ "It seems userdata is not supported in nova boot. " +
+ "Waiting a bit...")
tries += 1
else:
- self.logger.debug("Pinging %s. Waiting for response..."
- % test_ip)
+ self.logger.debug(
+ "Pinging %s. Waiting for response..." % test_ip)
sec += 1
- return EXIT_CODE
+ return exit_code
+
+
+def _get_userdata(test_ip):
+ """
+ Returns the post VM creation script to be added into the VM's userdata
+ :param test_ip: the IP value to substitute into the script
+ :return: the bash script contents
+ """
+ if test_ip:
+ return ("#!/bin/sh\n\n"
+ "while true; do\n"
+ " ping -c 1 %s 2>&1 >/dev/null\n"
+ " RES=$?\n"
+ " if [ \"Z$RES\" = \"Z0\" ] ; then\n"
+ " echo 'vPing OK'\n"
+ " break\n"
+ " else\n"
+ " echo 'vPing KO'\n"
+ " fi\n"
+ " sleep 1\n"
+ "done\n" % test_ip)
+ return None
if __name__ == '__main__':
- logging.basicConfig()
args_parser = argparse.ArgumentParser()
args_parser.add_argument("-r", "--report",
help="Create json result file",
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index e50d9c13..2f3dd74b 100755
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -25,12 +25,13 @@ import logging
import os
import re
import sys
-import urlparse
import robot.api
from robot.errors import RobotError
import robot.run
from robot.utils.robottime import timestamp_to_secs
+from six import StringIO
+from six.moves import urllib
from functest.core import testcase
import functest.utils.openstack_utils as op_utils
@@ -87,10 +88,10 @@ class ODLTests(testcase.TestCase):
try:
for line in fileinput.input(odl_variables_files,
inplace=True):
- print re.sub("AUTH = .*",
+ print(re.sub("AUTH = .*",
("AUTH = [u'" + odlusername + "', u'" +
odlpassword + "']"),
- line.rstrip())
+ line.rstrip()))
return True
except Exception as ex: # pylint: disable=broad-except
cls.__logger.error("Cannot set ODL creds: %s", str(ex))
@@ -150,7 +151,7 @@ class ODLTests(testcase.TestCase):
odlusername = kwargs['odlusername']
odlpassword = kwargs['odlpassword']
osauthurl = kwargs['osauthurl']
- keystoneip = urlparse.urlparse(osauthurl).hostname
+ keystoneip = urllib.parse.urlparse(osauthurl).hostname
variables = ['KEYSTONE:' + keystoneip,
'NEUTRON:' + kwargs['neutronip'],
'OS_AUTH_URL:"' + osauthurl + '"',
@@ -172,16 +173,11 @@ class ODLTests(testcase.TestCase):
self.__logger.exception(
"Cannot create %s", self.res_dir)
return self.EX_RUN_ERROR
- stdout_file = os.path.join(self.res_dir, 'stdout.txt')
output_dir = os.path.join(self.res_dir, 'output.xml')
- with open(stdout_file, 'w+') as stdout:
- robot.run(*suites, variable=variables,
- output=output_dir,
- log='NONE',
- report='NONE',
- stdout=stdout)
- stdout.seek(0, 0)
- self.__logger.info("\n" + stdout.read())
+ stream = StringIO()
+ robot.run(*suites, variable=variables, output=output_dir,
+ log='NONE', report='NONE', stdout=stream)
+ self.__logger.info("\n" + stream.getvalue())
self.__logger.info("ODL results were successfully generated")
try:
self.parse_results()
@@ -190,10 +186,6 @@ class ODLTests(testcase.TestCase):
self.__logger.error("Run tests before publishing: %s",
ex.message)
return self.EX_RUN_ERROR
- try:
- os.remove(stdout_file)
- except OSError:
- self.__logger.warning("Cannot remove %s", stdout_file)
return self.EX_OK
else:
return self.EX_RUN_ERROR
@@ -218,7 +210,7 @@ class ODLTests(testcase.TestCase):
except KeyError:
pass
neutron_url = op_utils.get_endpoint(service_type='network')
- kwargs = {'neutronip': urlparse.urlparse(neutron_url).hostname}
+ kwargs = {'neutronip': urllib.parse.urlparse(neutron_url).hostname}
kwargs['odlip'] = kwargs['neutronip']
kwargs['odlwebport'] = '8080'
kwargs['odlrestconfport'] = '8181'
diff --git a/functest/opnfv_tests/sdn/onos/onos.py b/functest/opnfv_tests/sdn/onos/onos.py
index d7a2d38e..5dfff036 100644
--- a/functest/opnfv_tests/sdn/onos/onos.py
+++ b/functest/opnfv_tests/sdn/onos/onos.py
@@ -25,7 +25,7 @@ class OnosBase(testcase.TestCase):
onos_repo_path = CONST.__getattribute__('dir_repo_onos')
onos_sfc_image_name = CONST.__getattribute__('onos_sfc_image_name')
onos_sfc_image_path = os.path.join(
- CONST.__getattribute__('dir_functest_data'),
+ CONST.__getattribute__('dir_functest_images'),
CONST.__getattribute__('onos_sfc_image_file_name'))
onos_sfc_path = os.path.join(CONST.__getattribute__('dir_repo_functest'),
CONST.__getattribute__('dir_onos_sfc'))
diff --git a/functest/opnfv_tests/vnf/ims/opera_ims.py b/functest/opnfv_tests/vnf/ims/opera_ims.py
index 8defdee6..8c33d16e 100644
--- a/functest/opnfv_tests/vnf/ims/opera_ims.py
+++ b/functest/opnfv_tests/vnf/ims/opera_ims.py
@@ -16,14 +16,14 @@ from opera import openo_connect
import requests
import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
-from functest.utils.constants import CONST
class OperaIms(clearwater_ims_base.ClearwaterOnBoardingBase):
- def __init__(self, project='functest', case_name='opera_ims',
- repo=CONST.dir_repo_opera, cmd=''):
- super(OperaIms, self).__init__(project, case_name, repo, cmd)
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = "opera_ims"
+ super(OperaIms, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.ellis_file = os.path.join(self.result_dir, 'ellis.info')
self.live_test_file = os.path.join(self.result_dir,
diff --git a/functest/tests/unit/ci/test_prepare_env.py b/functest/tests/unit/ci/test_prepare_env.py
index 39a0825c..513e7230 100644
--- a/functest/tests/unit/ci/test_prepare_env.py
+++ b/functest/tests/unit/ci/test_prepare_env.py
@@ -18,8 +18,6 @@ from opnfv.utils import constants as opnfv_constants
class PrepareEnvTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.prepare_envparser = prepare_env.PrepareEnvParser()
@@ -192,12 +190,17 @@ class PrepareEnvTesting(unittest.TestCase):
CONST.__getattribute__('dir_functest_conf'))
mock_method.assert_any_call(
CONST.__getattribute__('dir_functest_data'))
+ mock_method.assert_any_call(
+ CONST.__getattribute__('dir_functest_images'))
mock_logger_info.assert_any_call(" %s created." %
CONST.__getattribute__(
'dir_functest_conf'))
mock_logger_info.assert_any_call(" %s created." %
CONST.__getattribute__(
'dir_functest_data'))
+ mock_logger_info.assert_any_call(" %s created." %
+ CONST.__getattribute__(
+ 'dir_functest_images'))
@mock.patch('functest.ci.prepare_env.logger.info')
@mock.patch('functest.ci.prepare_env.logger.debug')
@@ -213,6 +216,9 @@ class PrepareEnvTesting(unittest.TestCase):
mock_logger_debug.assert_any_call(" %s already exists." %
CONST.__getattribute__(
'dir_functest_data'))
+ mock_logger_debug.assert_any_call(" %s already exists." %
+ CONST.__getattribute__(
+ 'dir_functest_images'))
def _get_env_cred_dict(self, os_prefix=''):
return {'OS_USERNAME': os_prefix + 'username',
@@ -456,4 +462,5 @@ class PrepareEnvTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py
index ed3379d0..88e5d2b8 100644
--- a/functest/tests/unit/ci/test_run_tests.py
+++ b/functest/tests/unit/ci/test_run_tests.py
@@ -29,8 +29,6 @@ class FakeModule(TestCase):
class RunTestsTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.runner = run_tests.Runner()
self.sep = 'test_sep'
@@ -274,4 +272,5 @@ class RunTestsTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_builder.py b/functest/tests/unit/ci/test_tier_builder.py
index feaf33a8..989c0870 100644
--- a/functest/tests/unit/ci/test_tier_builder.py
+++ b/functest/tests/unit/ci/test_tier_builder.py
@@ -15,8 +15,6 @@ from functest.ci import tier_builder
class TierBuilderTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.dependency = {'installer': 'test_installer',
'scenario': 'test_scenario'}
@@ -88,4 +86,5 @@ class TierBuilderTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py
index 28006274..c93fffd3 100644
--- a/functest/tests/unit/ci/test_tier_handler.py
+++ b/functest/tests/unit/ci/test_tier_handler.py
@@ -15,8 +15,6 @@ from functest.ci import tier_handler
class TierHandlerTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.test = mock.Mock()
attrs = {'get_name.return_value': 'test_name'}
@@ -139,4 +137,5 @@ class TierHandlerTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/cli/commands/test_cli_env.py b/functest/tests/unit/cli/commands/test_cli_env.py
index 4b6ea57a..14e926eb 100644
--- a/functest/tests/unit/cli/commands/test_cli_env.py
+++ b/functest/tests/unit/cli/commands/test_cli_env.py
@@ -18,8 +18,6 @@ from functest.tests.unit import test_utils
class CliEnvTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.cli_environ = cli_env.CliEnv()
@@ -28,7 +26,7 @@ class CliEnvTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_testcase.ft_utils.execute_command')
def test_prepare_default(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/prepare_env.py start" %
- CONST.dir_repo_functest)
+ CONST.__getattribute__('dir_repo_functest'))
self.cli_environ.prepare()
mock_ft_utils.assert_called_with(cmd)
@@ -40,29 +38,30 @@ class CliEnvTesting(unittest.TestCase):
mock.patch('functest.cli.commands.cli_testcase.os.remove') \
as mock_os_remove:
cmd = ("python %s/functest/ci/prepare_env.py start" %
- CONST.dir_repo_functest)
+ CONST.__getattribute__('dir_repo_functest'))
self.cli_environ.prepare()
- mock_os_remove.assert_called_once_with(CONST.env_active)
+ mock_os_remove.assert_called_once_with(
+ CONST.__getattribute__('env_active'))
mock_ft_utils.assert_called_with(cmd)
def _test_show_missing_env_var(self, var, *args):
if var == 'INSTALLER_TYPE':
- CONST.INSTALLER_TYPE = None
+ CONST.__setattr__('INSTALLER_TYPE', None)
reg_string = "| INSTALLER: Unknown, \S+\s*|"
elif var == 'INSTALLER_IP':
- CONST.INSTALLER_IP = None
+ CONST.__setattr__('INSTALLER_IP', None)
reg_string = "| INSTALLER: \S+, Unknown\s*|"
elif var == 'SCENARIO':
- CONST.DEPLOY_SCENARIO = None
+ CONST.__setattr__('DEPLOY_SCENARIO', None)
reg_string = "| SCENARIO: Unknown\s*|"
elif var == 'NODE':
- CONST.NODE_NAME = None
+ CONST.__setattr__('NODE_NAME', None)
reg_string = "| POD: Unknown\s*|"
elif var == 'BUILD_TAG':
- CONST.BUILD_TAG = None
+ CONST.__setattr__('BUILD_TAG', None)
reg_string = "| BUILD TAG: None|"
elif var == 'DEBUG':
- CONST.CI_DEBUG = None
+ CONST.__setattr__('CI_DEBUG', None)
reg_string = "| DEBUG FLAG: false\s*|"
elif var == 'STATUS':
reg_string = "| STATUS: not ready\s*|"
@@ -106,7 +105,7 @@ class CliEnvTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_env.os.path.exists',
return_value=False)
def test_show_missing_git_repo_dir(self, *args):
- CONST.dir_repo_functest = None
+ CONST.__setattr__('dir_repo_functest', None)
self.assertRaises(NoSuchPathError, lambda: self.cli_environ.show())
@mock.patch('functest.cli.commands.cli_env.click.echo')
@@ -127,4 +126,5 @@ class CliEnvTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/cli/commands/test_cli_os.py b/functest/tests/unit/cli/commands/test_cli_os.py
index f0e58c67..7ab4ddc3 100644
--- a/functest/tests/unit/cli/commands/test_cli_os.py
+++ b/functest/tests/unit/cli/commands/test_cli_os.py
@@ -18,7 +18,6 @@ from functest.utils.constants import CONST
class CliOpenStackTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
def setUp(self):
self.endpoint_ip = 'test_ip'
@@ -69,10 +68,10 @@ class CliOpenStackTesting(unittest.TestCase):
def test_fetch_credentials_default(self, mock_click_echo,
mock_os_path,
mock_ftutils_execute):
- CONST.INSTALLER_TYPE = self.installer_type
- CONST.INSTALLER_IP = self.installer_ip
+ CONST.__setattr__('INSTALLER_TYPE', self.installer_type)
+ CONST.__setattr__('INSTALLER_IP', self.installer_ip)
cmd = ("%s/releng/utils/fetch_os_creds.sh -d %s -i %s -a %s"
- % (CONST.dir_repos,
+ % (CONST.__getattribute__('dir_repos'),
self.openstack_creds,
self.installer_type,
self.installer_ip))
@@ -92,15 +91,13 @@ class CliOpenStackTesting(unittest.TestCase):
def test_fetch_credentials_missing_installer_type(self, mock_click_echo,
mock_os_path,
mock_ftutils_execute):
- installer_type = None
- installer_ip = self.installer_ip
- CONST.INSTALLER_TYPE = installer_type
- CONST.INSTALLER_IP = installer_ip
+ CONST.__setattr__('INSTALLER_TYPE', None)
+ CONST.__setattr__('INSTALLER_IP', self.installer_ip)
cmd = ("%s/releng/utils/fetch_os_creds.sh -d %s -i %s -a %s"
- % (CONST.dir_repos,
+ % (CONST.__getattribute__('dir_repos'),
self.openstack_creds,
- installer_type,
- installer_ip))
+ None,
+ self.installer_ip))
self.cli_os.openstack_creds = self.openstack_creds
self.cli_os.fetch_credentials()
mock_click_echo.assert_any_call("The environment variable "
@@ -109,8 +106,8 @@ class CliOpenStackTesting(unittest.TestCase):
mock_click_echo.assert_any_call("Fetching credentials from "
"installer node '%s' with "
"IP=%s.." %
- (installer_type,
- installer_ip))
+ (None,
+ self.installer_ip))
mock_ftutils_execute.assert_called_once_with(cmd, verbose=False)
@mock.patch('functest.cli.commands.cli_os.ft_utils.execute_command')
@@ -122,10 +119,10 @@ class CliOpenStackTesting(unittest.TestCase):
mock_ftutils_execute):
installer_type = self.installer_type
installer_ip = None
- CONST.INSTALLER_TYPE = installer_type
- CONST.INSTALLER_IP = installer_ip
+ CONST.__setattr__('INSTALLER_TYPE', installer_type)
+ CONST.__setattr__('INSTALLER_IP', installer_ip)
cmd = ("%s/releng/utils/fetch_os_creds.sh -d %s -i %s -a %s"
- % (CONST.dir_repos,
+ % (CONST.__getattribute__('dir_repos'),
self.openstack_creds,
installer_type,
installer_ip))
@@ -144,8 +141,9 @@ class CliOpenStackTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_os.ft_utils.execute_command')
def test_check(self, mock_ftutils_execute):
with mock.patch.object(self.cli_os, 'ping_endpoint'):
- CONST.dir_repo_functest = self.dir_repo_functest
- cmd = CONST.dir_repo_functest + "/functest/ci/check_os.sh"
+ CONST.__setattr__('dir_repo_functest', self.dir_repo_functest)
+ cmd = os.path.join(CONST.__getattribute__('dir_repo_functest'),
+ "functest/ci/check_os.sh")
self.cli_os.check()
mock_ftutils_execute.assert_called_once_with(cmd, verbose=False)
@@ -235,4 +233,5 @@ class CliOpenStackTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/cli/commands/test_cli_testcase.py b/functest/tests/unit/cli/commands/test_cli_testcase.py
index 39c8139d..fddfc317 100644
--- a/functest/tests/unit/cli/commands/test_cli_testcase.py
+++ b/functest/tests/unit/cli/commands/test_cli_testcase.py
@@ -17,8 +17,6 @@ from functest.utils.constants import CONST
class CliTestCasesTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.testname = 'testname'
with mock.patch('functest.cli.commands.cli_testcase.tb'):
@@ -42,7 +40,9 @@ class CliTestCasesTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_testcase.ft_utils.execute_command')
def test_run_default(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-n -r ", self.testname))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-n -r ", self.testname))
self.cli_tests.run(self.testname, noclean=True, report=True)
mock_ft_utils.assert_called_with(cmd)
@@ -51,7 +51,9 @@ class CliTestCasesTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_testcase.ft_utils.execute_command')
def test_run_noclean_missing_report(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-n ", self.testname))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-n ", self.testname))
self.cli_tests.run(self.testname, noclean=True, report=False)
mock_ft_utils.assert_called_with(cmd)
@@ -60,7 +62,9 @@ class CliTestCasesTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_testcase.ft_utils.execute_command')
def test_run_report_missing_noclean(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-r ", self.testname))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-r ", self.testname))
self.cli_tests.run(self.testname, noclean=False, report=True)
mock_ft_utils.assert_called_with(cmd)
@@ -69,7 +73,9 @@ class CliTestCasesTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_testcase.ft_utils.execute_command')
def test_run_missing_noclean_report(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "", self.testname))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "", self.testname))
self.cli_tests.run(self.testname, noclean=False, report=False)
mock_ft_utils.assert_called_with(cmd)
@@ -100,4 +106,5 @@ class CliTestCasesTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/cli/commands/test_cli_tier.py b/functest/tests/unit/cli/commands/test_cli_tier.py
index 802359f1..550eec93 100644
--- a/functest/tests/unit/cli/commands/test_cli_tier.py
+++ b/functest/tests/unit/cli/commands/test_cli_tier.py
@@ -17,8 +17,6 @@ from functest.utils.constants import CONST
class CliTierTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.tiername = 'tiername'
self.testnames = 'testnames'
@@ -90,8 +88,9 @@ class CliTierTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_tier.ft_utils.execute_command')
def test_run_default(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-n -r ",
- self.tiername))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-n -r ", self.tiername))
self.cli_tier.run(self.tiername, noclean=True, report=True)
mock_ft_utils.assert_called_with(cmd)
@@ -100,8 +99,9 @@ class CliTierTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_tier.ft_utils.execute_command')
def test_run_report_missing_noclean(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-r ",
- self.tiername))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-r ", self.tiername))
self.cli_tier.run(self.tiername, noclean=False, report=True)
mock_ft_utils.assert_called_with(cmd)
@@ -110,8 +110,9 @@ class CliTierTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_tier.ft_utils.execute_command')
def test_run_noclean_missing_report(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "-n ",
- self.tiername))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "-n ", self.tiername))
self.cli_tier.run(self.tiername, noclean=True, report=False)
mock_ft_utils.assert_called_with(cmd)
@@ -120,11 +121,13 @@ class CliTierTesting(unittest.TestCase):
@mock.patch('functest.cli.commands.cli_tier.ft_utils.execute_command')
def test_run_missing_noclean_report(self, mock_ft_utils, mock_os):
cmd = ("python %s/functest/ci/run_tests.py "
- "%s -t %s" % (CONST.dir_repo_functest, "",
- self.tiername))
+ "%s -t %s" %
+ (CONST.__getattribute__('dir_repo_functest'),
+ "", self.tiername))
self.cli_tier.run(self.tiername, noclean=False, report=False)
mock_ft_utils.assert_called_with(cmd)
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/cli/test_cli_base.py b/functest/tests/unit/cli/test_cli_base.py
index fe065c2a..89603279 100644
--- a/functest/tests/unit/cli/test_cli_base.py
+++ b/functest/tests/unit/cli/test_cli_base.py
@@ -22,8 +22,6 @@ with mock.patch('functest.cli.commands.cli_testcase.CliTestcase.__init__',
class CliBaseTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.runner = CliRunner()
self._openstack = cli_base._openstack
@@ -135,4 +133,5 @@ class CliBaseTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_feature.py b/functest/tests/unit/core/test_feature.py
index 8de42ec5..0160c8e1 100644
--- a/functest/tests/unit/core/test_feature.py
+++ b/functest/tests/unit/core/test_feature.py
@@ -17,10 +17,6 @@ import mock
from functest.core import feature
from functest.core import testcase
-# logging must be disabled else it calls time.time()
-# what will break these unit tests.
-logging.disable(logging.CRITICAL)
-
class FeatureTestingBase(unittest.TestCase):
@@ -95,4 +91,7 @@ class BashFeatureTesting(FeatureTestingBase):
if __name__ == "__main__":
+ # logging must be disabled else it calls time.time()
+ # what will break these unit tests.
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_pytest_suite_runner.py b/functest/tests/unit/core/test_pytest_suite_runner.py
new file mode 100644
index 00000000..07ac7906
--- /dev/null
+++ b/functest/tests/unit/core/test_pytest_suite_runner.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from functest.core import pytest_suite_runner
+from functest.core import testcase
+
+
+class PyTestSuiteRunnerTesting(unittest.TestCase):
+
+ def setUp(self):
+ self.psrunner = pytest_suite_runner.PyTestSuiteRunner()
+ self.result = mock.Mock()
+ attrs = {'errors': [('test1', 'error_msg1')],
+ 'failures': [('test2', 'failure_msg1')]}
+ self.result.configure_mock(**attrs)
+
+ self.pass_results = mock.Mock()
+ attrs = {'errors': None,
+ 'failures': None}
+ self.pass_results.configure_mock(**attrs)
+
+ def test_run(self):
+ self.psrunner.case_name = 'test_case_name'
+ with mock.patch('functest.core.pytest_suite_runner.'
+ 'unittest.TextTestRunner.run',
+ return_value=self.result):
+ self.assertEqual(self.psrunner.run(),
+ testcase.TestCase.EX_OK)
+
+ with mock.patch('functest.core.pytest_suite_runner.'
+ 'unittest.TextTestRunner.run',
+ return_value=self.pass_results):
+ self.assertEqual(self.psrunner.run(),
+ testcase.TestCase.EX_OK)
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_testcase.py b/functest/tests/unit/core/test_testcase.py
index 72229671..ef0983cc 100644
--- a/functest/tests/unit/core/test_testcase.py
+++ b/functest/tests/unit/core/test_testcase.py
@@ -7,7 +7,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-"""Define the classe required to fully cover testcase."""
+"""Define the class required to fully cover testcase."""
import logging
import unittest
@@ -23,8 +23,6 @@ class TestCaseTesting(unittest.TestCase):
"""The class testing TestCase."""
# pylint: disable=missing-docstring,too-many-public-methods
- logging.disable(logging.CRITICAL)
-
_case_name = "base"
_project_name = "functest"
_published_result = "PASS"
@@ -225,4 +223,5 @@ class TestCaseTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py
index f348c0db..e322773e 100644
--- a/functest/tests/unit/core/test_vnf.py
+++ b/functest/tests/unit/core/test_vnf.py
@@ -7,32 +7,141 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+# pylint: disable=missing-docstring
+
import logging
+import os
import unittest
+import mock
+
from functest.core import vnf
+from functest.core import testcase
class VnfBaseTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
- self.test = vnf.VnfOnBoarding(project='functest',
- case_name='aaa')
+ self.test = vnf.VnfOnBoarding(
+ project='functest', case_name='aaa')
self.test.project = "functest"
self.test.start_time = "1"
self.test.stop_time = "5"
self.test.result = ""
- self.test.details = {"orchestrator": {"status": "PASS",
- "result": "",
- "duration": 20},
- "vnf": {"status": "PASS",
- "result": "",
- "duration": 15},
- "test_vnf": {"status": "FAIL",
- "result": "",
- "duration": 5}}
+ self.test.details = {
+ "orchestrator": {"status": "PASS", "result": "", "duration": 20},
+ "vnf": {"status": "PASS", "result": "", "duration": 15},
+ "test_vnf": {"status": "FAIL", "result": "", "duration": 5}}
+ self.test.keystone_client = 'test_client'
+ self.test.tenant_name = 'test_tenant_name'
+
+ def test_execute_deploy_vnf_fail(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ side_effect=Exception):
+ self.assertEqual(self.test.execute(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_execute_test_vnf_fail(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf'), \
+ mock.patch.object(self.test, 'test_vnf',
+ side_effect=Exception):
+ self.assertEqual(self.test.execute(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ @mock.patch('functest.core.vnf.os_utils.get_tenant_id',
+ return_value='test_tenant_id')
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_user_id')
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_execute_default(self, *args):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf'), \
+ mock.patch.object(self.test, 'test_vnf'), \
+ mock.patch.object(self.test, 'parse_results',
+ return_value='ret_exit_code'), \
+ mock.patch.object(self.test, 'log_results'):
+ self.assertEqual(self.test.execute(),
+ 'ret_exit_code')
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id', return_value='')
+ def test_prepare_missing_userid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='')
+ def test_prepare_missing_tenantid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='')
+ def test_prepare_missing_roleid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.add_role_user',
+ return_value='')
+ def test_prepare_role_add_failure(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.add_role_user')
+ @mock.patch('functest.core.vnf.os_utils.create_user',
+ return_value='')
+ def test_create_user_failure(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ def test_log_results_default(self):
+ with mock.patch('functest.core.vnf.'
+ 'ft_utils.logger_test_results') \
+ as mock_method:
+ self.test.log_results()
+ self.assertTrue(mock_method.called)
+
+ def test_step_failures_default(self):
+ with self.assertRaises(Exception):
+ self.test.step_failure("error_msg")
def test_deploy_vnf_unimplemented(self):
with self.assertRaises(Exception) as context:
@@ -44,9 +153,15 @@ class VnfBaseTesting(unittest.TestCase):
self.test.test_vnf()()
self.assertTrue('VNF not tested' in context.exception)
- def test_parse_results(self):
- self.assertNotEqual(self.test.parse_results(), 0)
+ def test_parse_results_ex_ok(self):
+ self.test.details['test_vnf']['status'] = 'PASS'
+ self.assertEqual(self.test.parse_results(), os.EX_OK)
+
+ def test_parse_results_ex_run_error(self):
+ self.test.details['vnf']['status'] = 'FAIL'
+ self.assertEqual(self.test.parse_results(), os.EX_SOFTWARE)
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/energy/test_functest_energy.py b/functest/tests/unit/energy/test_functest_energy.py
index ffe044bc..6387b97b 100644
--- a/functest/tests/unit/energy/test_functest_energy.py
+++ b/functest/tests/unit/energy/test_functest_energy.py
@@ -19,8 +19,6 @@ import functest.energy.energy as energy
CASE_NAME = "UNIT_test_CASE"
STEP_NAME = "UNIT_test_STEP"
-logging.disable(logging.CRITICAL)
-
class MockHttpResponse(object): # pylint: disable=too-few-public-methods
"""Mock response for Energy recorder API."""
@@ -274,4 +272,5 @@ class EnergyRecorderTest(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/features/test_barometer.py b/functest/tests/unit/features/test_barometer.py
index 8ca463b2..c6512615 100644
--- a/functest/tests/unit/features/test_barometer.py
+++ b/functest/tests/unit/features/test_barometer.py
@@ -23,8 +23,6 @@ from functest.opnfv_tests.features import barometer
class BarometerTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
_case_name = "barometercollectd"
_project_name = "barometer"
@@ -47,4 +45,5 @@ class BarometerTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/odl/test_odl.py b/functest/tests/unit/odl/test_odl.py
index d7ce70c7..60adf211 100644
--- a/functest/tests/unit/odl/test_odl.py
+++ b/functest/tests/unit/odl/test_odl.py
@@ -12,14 +12,14 @@
import errno
import logging
import os
-import StringIO
import unittest
from keystoneauth1.exceptions import auth_plugins
import mock
from robot.errors import DataError, RobotError
-from robot.result import testcase as result_testcase
+from robot.result import model
from robot.utils.robottime import timestamp_to_secs
+import six
from functest.core import testcase
from functest.opnfv_tests.sdn.odl import odl
@@ -32,8 +32,6 @@ class ODLVisitorTesting(unittest.TestCase):
"""The class testing ODLResultVisitor."""
# pylint: disable=missing-docstring
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.visitor = odl.ODLResultVisitor()
@@ -49,11 +47,9 @@ class ODLVisitorTesting(unittest.TestCase):
'elapsedtime': 1000,
'text': 'Hello, World!',
'critical': True}
- test = result_testcase.TestCase(name=data['name'],
- status=data['status'],
- message=data['text'],
- starttime=data['starttime'],
- endtime=data['endtime'])
+ test = model.TestCase(
+ name=data['name'], status=data['status'], message=data['text'],
+ starttime=data['starttime'], endtime=data['endtime'])
test.parent = mock.Mock()
config = {'name': data['parent'],
'criticality.test_is_critical.return_value': data[
@@ -173,7 +169,7 @@ class ODLRobotTesting(ODLTesting):
os.path.join(odl.ODLTests.odl_test_repo,
'csit/variables/Variables.py'), inplace=True)
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def _test_set_vars(self, msg1, msg2, *args):
line = mock.MagicMock()
line.__iter__.return_value = [msg1]
@@ -191,7 +187,7 @@ class ODLRobotTesting(ODLTesting):
def test_set_vars_auth1(self):
self._test_set_vars("AUTH1 = []", "AUTH1 = []")
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def test_set_vars_auth_foo(self, *args):
line = mock.MagicMock()
line.__iter__.return_value = ["AUTH = []"]
@@ -314,8 +310,6 @@ class ODLMainTesting(ODLTesting):
def test_run_ko(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True), \
self.assertRaises(RobotError):
self._test_main(testcase.TestCase.EX_RUN_ERROR, *args)
@@ -324,71 +318,33 @@ class ODLMainTesting(ODLTesting):
def test_parse_results_ko(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True), \
mock.patch.object(self.test, 'parse_results',
side_effect=RobotError):
self._test_main(testcase.TestCase.EX_RUN_ERROR, *args)
- @mock.patch('os.remove', side_effect=Exception)
- @mock.patch('robot.run')
- @mock.patch('os.makedirs')
- def test_remove_exc(self, *args):
- with mock.patch.object(self.test, 'set_robotframework_vars',
- return_value=True), \
- mock.patch.object(self.test, 'parse_results'), \
- self.assertRaises(Exception):
- self._test_main(testcase.TestCase.EX_OK, *args)
-
- @mock.patch('os.remove')
@mock.patch('robot.run')
@mock.patch('os.makedirs')
def test_ok(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True), \
mock.patch.object(self.test, 'parse_results'):
self._test_main(testcase.TestCase.EX_OK, *args)
- @mock.patch('os.remove')
@mock.patch('robot.run')
@mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
def test_makedirs_oserror17(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True) as mock_open, \
mock.patch.object(self.test, 'parse_results'):
self._test_main(testcase.TestCase.EX_OK, *args)
- mock_open.assert_called_once_with(
- os.path.join(odl.ODLTests.res_dir, 'stdout.txt'), 'w+')
- @mock.patch('os.remove')
@mock.patch('robot.run', return_value=1)
@mock.patch('os.makedirs')
def test_testcases_in_failure(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True) as mock_open, \
mock.patch.object(self.test, 'parse_results'):
self._test_main(testcase.TestCase.EX_OK, *args)
- mock_open.assert_called_once_with(
- os.path.join(odl.ODLTests.res_dir, 'stdout.txt'), 'w+')
-
- @mock.patch('os.remove', side_effect=OSError)
- @mock.patch('robot.run')
- @mock.patch('os.makedirs')
- def test_remove_oserror(self, *args):
- with mock.patch.object(self.test, 'set_robotframework_vars',
- return_value=True), \
- mock.patch.object(odl, 'open', mock.mock_open(),
- create=True) as mock_open, \
- mock.patch.object(self.test, 'parse_results'):
- self._test_main(testcase.TestCase.EX_OK, *args)
- mock_open.assert_called_once_with(
- os.path.join(odl.ODLTests.res_dir, 'stdout.txt'), 'w+')
class ODLRunTesting(ODLTesting):
@@ -579,7 +535,7 @@ class ODLArgParserTesting(ODLTesting):
"--odlip={}".format(self._sdn_controller_ip)]),
self.defaultargs)
- @mock.patch('sys.stderr', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stderr', new_callable=six.StringIO)
def test_fail(self, mock_method):
self.defaultargs['foo'] = 'bar'
with self.assertRaises(SystemExit):
@@ -638,4 +594,5 @@ class ODLArgParserTesting(ODLTesting):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/openstack/rally/test_rally.py b/functest/tests/unit/openstack/rally/test_rally.py
index fe25dfcf..b9e78616 100644
--- a/functest/tests/unit/openstack/rally/test_rally.py
+++ b/functest/tests/unit/openstack/rally/test_rally.py
@@ -19,8 +19,6 @@ from functest.utils.constants import CONST
class OSRallyTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.nova_client = mock.Mock()
self.neutron_client = mock.Mock()
@@ -39,7 +37,7 @@ class OSRallyTesting(unittest.TestCase):
self.polling_iter = 2
def test_build_task_args_missing_floating_network(self):
- CONST.OS_AUTH_URL = None
+ CONST.__setattr__('OS_AUTH_URL', None)
with mock.patch('functest.opnfv_tests.openstack.rally.rally.'
'os_utils.get_external_net',
return_value=None):
@@ -47,7 +45,7 @@ class OSRallyTesting(unittest.TestCase):
self.assertEqual(task_args['floating_network'], '')
def test_build_task_args_missing_net_id(self):
- CONST.OS_AUTH_URL = None
+ CONST.__setattr__('OS_AUTH_URL', None)
self.rally_base.network_dict['net_id'] = ''
with mock.patch('functest.opnfv_tests.openstack.rally.rally.'
'os_utils.get_external_net',
@@ -56,7 +54,7 @@ class OSRallyTesting(unittest.TestCase):
self.assertEqual(task_args['netid'], '')
def test_build_task_args_missing_auth_url(self):
- CONST.OS_AUTH_URL = None
+ CONST.__setattr__('OS_AUTH_URL', None)
with mock.patch('functest.opnfv_tests.openstack.rally.rally.'
'os_utils.get_external_net',
return_value='test_floating_network'):
@@ -136,8 +134,8 @@ class OSRallyTesting(unittest.TestCase):
'lineline')
def test_excl_scenario_default(self):
- CONST.INSTALLER_TYPE = 'test_installer'
- CONST.DEPLOY_SCENARIO = 'test_scenario'
+ CONST.__setattr__('INSTALLER_TYPE', 'test_installer')
+ CONST.__setattr__('DEPLOY_SCENARIO', 'test_scenario')
dic = {'scenario': [{'scenarios': ['test_scenario'],
'installers': ['test_installer'],
'tests': ['test']}]}
@@ -154,8 +152,8 @@ class OSRallyTesting(unittest.TestCase):
[])
def test_excl_func_default(self):
- CONST.INSTALLER_TYPE = 'test_installer'
- CONST.DEPLOY_SCENARIO = 'test_scenario'
+ CONST.__setattr__('INSTALLER_TYPE', 'test_installer')
+ CONST.__setattr__('DEPLOY_SCENARIO', 'test_scenario')
dic = {'functionality': [{'functions': ['no_live_migration'],
'tests': ['test']}]}
with mock.patch('__builtin__.open', mock.mock_open()), \
@@ -343,19 +341,6 @@ class OSRallyTesting(unittest.TestCase):
self.rally_base._run_tests()
self.rally_base._run_task.assert_any_call('test1')
- @mock.patch('functest.opnfv_tests.openstack.rally.rally.logger.info')
- def test_generate_report(self, mock_logger_info):
- summary = [{'test_name': 'test_name',
- 'overall_duration': 5,
- 'nb_tests': 3,
- 'success': 5}]
- self.rally_base.summary = summary
- with mock.patch('functest.opnfv_tests.openstack.rally.rally.'
- 'ft_utils.check_success_rate',
- return_value='criteria'):
- self.rally_base._generate_report()
- self.assertTrue(mock_logger_info.called)
-
def test_clean_up_default(self):
self.rally_base.volume_type = mock.Mock()
self.rally_base.cinder_client = mock.Mock()
@@ -388,4 +373,5 @@ class OSRallyTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/openstack/refstack_client/test_refstack_client.py b/functest/tests/unit/openstack/refstack_client/test_refstack_client.py
index 60e180c9..8c149baa 100644
--- a/functest/tests/unit/openstack/refstack_client/test_refstack_client.py
+++ b/functest/tests/unit/openstack/refstack_client/test_refstack_client.py
@@ -17,11 +17,12 @@ from functest.utils.constants import CONST
class OSRefstackClientTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
- _config = \
- os.path.join(CONST.dir_functest_test, CONST.refstack_tempest_conf_path)
- _testlist = \
- os.path.join(CONST.dir_functest_test, CONST.refstack_defcore_list)
+ _config = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ CONST.__getattribute__('refstack_tempest_conf_path'))
+ _testlist = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ CONST.__getattribute__('refstack_defcore_list'))
def setUp(self):
self.defaultargs = {'config': self._config,
@@ -29,12 +30,13 @@ class OSRefstackClientTesting(unittest.TestCase):
self.refstackclient = refstack_client.RefstackClient()
def test_source_venv(self):
- CONST.dir_refstack_client = 'test_repo_dir'
+ CONST.__setattr__('dir_refstack_client', 'test_repo_dir')
with mock.patch('functest.opnfv_tests.openstack.refstack_client.'
'refstack_client.ft_utils.execute_command') as m:
cmd = ("cd {0};"
". .venv/bin/activate;"
- "cd -;".format(CONST.dir_refstack_client))
+ "cd -;"
+ .format(CONST.__getattribute__('dir_refstack_client')))
self.refstackclient.source_venv()
m.assert_any_call(cmd)
@@ -45,9 +47,10 @@ class OSRefstackClientTesting(unittest.TestCase):
'refstack_client.ft_utils.execute_command') as m:
cmd = ("cd {0};"
"./refstack-client test -c {1} -v --test-list {2};"
- "cd -;".format(CONST.dir_refstack_client,
- config,
- testlist))
+ "cd -;"
+ .format(CONST.__getattribute__('dir_refstack_client'),
+ config,
+ testlist))
self.refstackclient.run_defcore(config, testlist)
m.assert_any_call(cmd)
@@ -63,7 +66,7 @@ class OSRefstackClientTesting(unittest.TestCase):
self.assertEqual(self.refstackclient.main(**kwargs), status)
if len(args) > 0:
args[0].assert_called_once_with(
- refstack_client.RefstackClient.result_dir)
+ refstack_client.RefstackClient.result_dir)
if len(args) > 1:
args
@@ -101,4 +104,5 @@ class OSRefstackClientTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/openstack/tempest/test_conf_utils.py b/functest/tests/unit/openstack/tempest/test_conf_utils.py
index 8ca5cc5b..23f6e45c 100644
--- a/functest/tests/unit/openstack/tempest/test_conf_utils.py
+++ b/functest/tests/unit/openstack/tempest/test_conf_utils.py
@@ -16,8 +16,6 @@ from functest.utils.constants import CONST
class OSTempestConfUtilsTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def test_create_tempest_resources_missing_network_dic(self):
with mock.patch('functest.opnfv_tests.openstack.tempest.conf_utils.'
'os_utils.get_keystone_client',
@@ -54,12 +52,12 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
return_value=(mock.Mock(), None)), \
self.assertRaises(Exception) as context:
- CONST.tempest_use_custom_images = True
+ CONST.__setattr__('tempest_use_custom_images', True)
conf_utils.create_tempest_resources()
msg = 'Failed to create image'
self.assertTrue(msg in context)
- CONST.tempest_use_custom_images = False
+ CONST.__setattr__('tempest_use_custom_images', False)
conf_utils.create_tempest_resources(use_custom_images=True)
msg = 'Failed to create image'
self.assertTrue(msg in context)
@@ -84,20 +82,20 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
'os_utils.get_or_create_flavor',
return_value=(mock.Mock(), None)), \
self.assertRaises(Exception) as context:
- CONST.tempest_use_custom_images = True
- CONST.tempest_use_custom_flavors = True
+ CONST.__setattr__('tempest_use_custom_images', True)
+ CONST.__setattr__('tempest_use_custom_flavors', True)
conf_utils.create_tempest_resources()
msg = 'Failed to create flavor'
self.assertTrue(msg in context)
- CONST.tempest_use_custom_images = True
- CONST.tempest_use_custom_flavors = False
+ CONST.__setattr__('tempest_use_custom_images', True)
+ CONST.__setattr__('tempest_use_custom_flavors', False)
conf_utils.create_tempest_resources(use_custom_flavors=False)
msg = 'Failed to create flavor'
self.assertTrue(msg in context)
def test_get_verifier_id_missing_verifier(self):
- CONST.tempest_deployment_name = 'test_deploy_name'
+ CONST.__setattr__('tempest_deployment_name', 'test_deploy_name')
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.subprocess.Popen') as mock_popen, \
self.assertRaises(Exception):
@@ -108,7 +106,7 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
conf_utils.get_verifier_id(),
def test_get_verifier_id_default(self):
- CONST.tempest_deployment_name = 'test_deploy_name'
+ CONST.__setattr__('tempest_deployment_name', 'test_deploy_name')
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.subprocess.Popen') as mock_popen:
mock_stdout = mock.Mock()
@@ -120,7 +118,7 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
'test_deploy_id')
def test_get_verifier_deployment_id_missing_rally(self):
- CONST.rally_deployment_name = 'test_rally_deploy_name'
+ CONST.__setattr__('tempest_deployment_name', 'test_deploy_name')
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.subprocess.Popen') as mock_popen, \
self.assertRaises(Exception):
@@ -131,7 +129,7 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
conf_utils.get_verifier_deployment_id(),
def test_get_verifier_deployment_id_default(self):
- CONST.rally_deployment_name = 'test_rally_deploy_name'
+ CONST.__setattr__('tempest_deployment_name', 'test_deploy_name')
with mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.subprocess.Popen') as mock_popen:
mock_stdout = mock.Mock()
@@ -240,8 +238,8 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
mock.patch('__builtin__.open', mock.mock_open()), \
mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.shutil.copyfile'):
- CONST.dir_functest_test = 'test_dir'
- CONST.refstack_tempest_conf_path = 'test_path'
+ CONST.__setattr__('dir_functest_test', 'test_dir')
+ CONST.__setattr__('refstack_tempest_conf_path', 'test_path')
conf_utils.configure_tempest_defcore('test_dep_dir',
img_flavor_dict)
mset.assert_any_call('compute', 'image_ref', 'test_image_id')
@@ -266,8 +264,8 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
mock.patch('__builtin__.open', mock.mock_open()), \
mock.patch('functest.opnfv_tests.openstack.tempest.'
'conf_utils.backup_tempest_config'):
- CONST.dir_functest_test = 'test_dir'
- CONST.OS_ENDPOINT_TYPE = None
+ CONST.__setattr__('dir_functest_test', 'test_dir')
+ CONST.__setattr__('OS_ENDPOINT_TYPE', None)
conf_utils.\
configure_tempest_update_params('test_conf_file',
IMAGE_ID=image_id,
@@ -277,25 +275,25 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
self.assertTrue(mwrite.called)
def test_configure_tempest_update_params_missing_image_id(self):
- CONST.tempest_use_custom_images = True
+ CONST.__setattr__('tempest_use_custom_images', True)
self._test_missing_param(('compute', 'image_ref',
'test_image_id'), 'test_image_id',
None)
def test_configure_tempest_update_params_missing_image_id_alt(self):
- CONST.tempest_use_custom_images = True
+ CONST.__setattr__('tempest_use_custom_images', True)
conf_utils.IMAGE_ID_ALT = 'test_image_id_alt'
self._test_missing_param(('compute', 'image_ref_alt',
'test_image_id_alt'), None, None)
def test_configure_tempest_update_params_missing_flavor_id(self):
- CONST.tempest_use_custom_flavors = True
+ CONST.__setattr__('tempest_use_custom_flavors', True)
self._test_missing_param(('compute', 'flavor_ref',
'test_flavor_id'), None,
'test_flavor_id')
def test_configure_tempest_update_params_missing_flavor_id_alt(self):
- CONST.tempest_use_custom_flavors = True
+ CONST.__setattr__('tempest_use_custom_flavors', True)
conf_utils.FLAVOR_ID_ALT = 'test_flavor_id_alt'
self._test_missing_param(('compute', 'flavor_ref_alt',
'test_flavor_id_alt'), None,
@@ -371,4 +369,5 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
mexe.assert_called_once_with(cmd, error_msg=error_msg)
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/openstack/tempest/test_tempest.py b/functest/tests/unit/openstack/tempest/test_tempest.py
index e05e5dfa..b8b258b3 100644
--- a/functest/tests/unit/openstack/tempest/test_tempest.py
+++ b/functest/tests/unit/openstack/tempest/test_tempest.py
@@ -18,8 +18,6 @@ from functest.utils.constants import CONST
class OSTempestTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
with mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
'conf_utils.get_verifier_id',
@@ -114,8 +112,8 @@ class OSTempestTesting(unittest.TestCase):
mock.patch.object(self.tempestcommon, 'read_file',
return_value=['test1', 'test2']):
conf_utils.TEMPEST_BLACKLIST = Exception
- CONST.INSTALLER_TYPE = 'installer_type'
- CONST.DEPLOY_SCENARIO = 'deploy_scenario'
+ CONST.__setattr__('INSTALLER_TYPE', 'installer_type')
+ CONST.__setattr__('DEPLOY_SCENARIO', 'deploy_scenario')
self.tempestcommon.apply_tempest_blacklist()
obj = m()
obj.write.assert_any_call('test1\n')
@@ -130,8 +128,8 @@ class OSTempestTesting(unittest.TestCase):
return_value=['test1', 'test2']), \
mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
'yaml.safe_load', return_value=item_dict):
- CONST.INSTALLER_TYPE = 'installer_type'
- CONST.DEPLOY_SCENARIO = 'deploy_scenario'
+ CONST.__setattr__('INSTALLER_TYPE', 'installer_type')
+ CONST.__setattr__('DEPLOY_SCENARIO', 'deploy_scenario')
self.tempestcommon.apply_tempest_blacklist()
obj = m()
obj.write.assert_any_call('test1\n')
@@ -151,24 +149,6 @@ class OSTempestTesting(unittest.TestCase):
assert_any_call("Starting Tempest test suite: '%s'."
% cmd_line)
- @mock.patch('functest.opnfv_tests.openstack.tempest.tempest.logger.info')
- def test_parse_verifier_result_default(self, mock_logger_info):
- self.tempestcommon.VERIFICATION_ID = 'test_uuid'
- self.tempestcommon.case_name = 'test_case_name'
- stdout = ['Testscount||2', 'Success||2', 'Skipped||0', 'Failures||0']
- with mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
- 'subprocess.Popen') as mock_popen, \
- mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
- 'ft_utils.check_success_rate') as mock_method, \
- mock.patch('__builtin__.open', mock.mock_open()):
- mock_stdout = mock.Mock()
- attrs = {'stdout': stdout}
- mock_stdout.configure_mock(**attrs)
- mock_popen.return_value = mock_stdout
-
- self.tempestcommon.parse_verifier_result()
- mock_method.assert_any_call('test_case_name', 100)
-
@mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
'os.path.exists', return_value=False)
@mock.patch('functest.opnfv_tests.openstack.tempest.tempest.os.makedirs',
@@ -248,4 +228,5 @@ class OSTempestTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_decorators.py b/functest/tests/unit/utils/test_decorators.py
new file mode 100644
index 00000000..6bd47d25
--- /dev/null
+++ b/functest/tests/unit/utils/test_decorators.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the class required to fully cover decorators."""
+
+from datetime import datetime
+import errno
+import json
+import logging
+import os
+import unittest
+
+import mock
+
+from functest.utils import decorators
+from functest.utils import functest_utils
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+VERSION = 'master'
+DIR = '/dev'
+FILE = '{}/null'.format(DIR)
+URL = 'file://{}'.format(FILE)
+
+
+class DecoratorsTesting(unittest.TestCase):
+ # pylint: disable=missing-docstring
+
+ _case_name = 'base'
+ _project_name = 'functest'
+ _start_time = 1.0
+ _stop_time = 2.0
+ _result = 'PASS'
+ _build_tag = VERSION
+ _node_name = 'bar'
+ _deploy_scenario = 'foo'
+ _installer_type = 'debian'
+
+ def setUp(self):
+ os.environ['INSTALLER_TYPE'] = self._installer_type
+ os.environ['DEPLOY_SCENARIO'] = self._deploy_scenario
+ os.environ['NODE_NAME'] = self._node_name
+ os.environ['BUILD_TAG'] = self._build_tag
+
+ def test_wraps(self):
+ self.assertEqual(functest_utils.push_results_to_db.__name__,
+ "push_results_to_db")
+
+ def _get_json(self):
+ stop_time = datetime.fromtimestamp(self._stop_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ start_time = datetime.fromtimestamp(self._start_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ data = {'project_name': self._project_name,
+ 'stop_date': stop_time, 'start_date': start_time,
+ 'case_name': self._case_name, 'build_tag': self._build_tag,
+ 'pod_name': self._node_name, 'installer': self._installer_type,
+ 'scenario': self._deploy_scenario, 'version': VERSION,
+ 'details': {}, 'criteria': self._result}
+ return json.dumps(data)
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value='http://127.0.0.1')
+ @mock.patch('{}.get_version'.format(functest_utils.__name__),
+ return_value=VERSION)
+ @mock.patch('requests.post')
+ def test_http_shema(self, *args):
+ self.assertTrue(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ args[1].assert_called_once_with()
+ args[2].assert_called_once_with()
+ args[0].assert_called_once_with(
+ 'http://127.0.0.1', data=self._get_json(),
+ headers={'Content-Type': 'application/json'})
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value="/dev/null")
+ def test_wrong_shema(self, mock_method=None):
+ self.assertFalse(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ mock_method.assert_called_once_with()
+
+ @mock.patch('{}.get_version'.format(functest_utils.__name__),
+ return_value=VERSION)
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value=URL)
+ def _test_dump(self, *args):
+ with mock.patch.object(decorators, 'open', mock.mock_open(),
+ create=True) as mock_open:
+ self.assertTrue(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ mock_open.assert_called_once_with(FILE, 'a')
+ handle = mock_open()
+ call_args, _ = handle.write.call_args
+ self.assertIn('POST', call_args[0])
+ self.assertIn(self._get_json(), call_args[0])
+ args[0].assert_called_once_with()
+ args[1].assert_called_once_with()
+
+ @mock.patch('os.makedirs')
+ def test_default_dump(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
+ def test_makedirs_dir_exists(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value=URL)
+ @mock.patch('os.makedirs', side_effect=OSError)
+ def test_makedirs_exc(self, *args):
+ self.assertFalse(
+ functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ args[0].assert_called_once_with(DIR)
+ args[1].assert_called_once_with()
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_functest_utils.py b/functest/tests/unit/utils/test_functest_utils.py
index 573fcb70..0fe7e91d 100644
--- a/functest/tests/unit/utils/test_functest_utils.py
+++ b/functest/tests/unit/utils/test_functest_utils.py
@@ -11,11 +11,11 @@ import logging
import os
import time
import unittest
-import urllib2
from git.exc import NoSuchPathError
import mock
import requests
+from six.moves import urllib
from functest.tests.unit import test_utils
from functest.utils import functest_utils
@@ -23,8 +23,6 @@ from functest.utils import functest_utils
class FunctestUtilsTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.url = 'http://www.opnfv.org/'
self.timeout = 5
@@ -62,31 +60,31 @@ class FunctestUtilsTesting(unittest.TestCase):
self.file_yaml = {'general': {'openstack': {'image_name':
'test_image_name'}}}
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_check_internet_connectivity_failed(self, mock_method):
self.assertFalse(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_default(self, mock_method):
self.assertTrue(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_debian(self, mock_method):
self.url = "https://www.debian.org/"
self.assertTrue(functest_utils.check_internet_connectivity(self.url))
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_download_url_failed(self, mock_url):
self.assertFalse(functest_utils.download_url(self.url, self.dest_path))
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_download_url_default(self, mock_url):
- with mock.patch("__builtin__.open", mock.mock_open()) as m, \
+ with mock.patch("six.moves.builtins.open", mock.mock_open()) as m, \
mock.patch('functest.utils.functest_utils.shutil.copyfileobj')\
as mock_sh:
name = self.url.rsplit('/')[-1]
@@ -371,7 +369,7 @@ class FunctestUtilsTesting(unittest.TestCase):
attrs = {'readline.side_effect': self.readline_side}
m.configure_mock(**attrs)
- with mock.patch("__builtin__.open") as mo:
+ with mock.patch("six.moves.builtins.open") as mo:
mo.return_value = m
self.assertEqual(functest_utils.get_resolvconf_ns(),
self.test_ip[1:])
@@ -399,7 +397,8 @@ class FunctestUtilsTesting(unittest.TestCase):
mock_logger_error):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
@@ -428,7 +427,8 @@ class FunctestUtilsTesting(unittest.TestCase):
):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
@@ -503,7 +503,7 @@ class FunctestUtilsTesting(unittest.TestCase):
@mock.patch('functest.utils.functest_utils.logger.error')
def test_get_dict_by_test(self, mock_logger_error):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
mock.patch('functest.utils.functest_utils.get_testcases_'
@@ -531,7 +531,7 @@ class FunctestUtilsTesting(unittest.TestCase):
def test_get_parameter_from_yaml_failed(self):
self.file_yaml['general'] = None
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
self.assertRaises(ValueError) as excep:
@@ -543,7 +543,7 @@ class FunctestUtilsTesting(unittest.TestCase):
self.parameter) in excep.exception)
def test_get_parameter_from_yaml_default(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
@@ -561,22 +561,6 @@ class FunctestUtilsTesting(unittest.TestCase):
assert_called_once_with(self.parameter,
self.config_yaml)
- def test_check_success_rate_default(self):
- with mock.patch('functest.utils.functest_utils.get_criteria_by_test') \
- as mock_criteria:
- mock_criteria.return_value = self.criteria
- resp = functest_utils.check_success_rate(self.case_name,
- self.result)
- self.assertEqual(resp, 'PASS')
-
- def test_check_success_rate_failed(self):
- with mock.patch('functest.utils.functest_utils.get_criteria_by_test') \
- as mock_criteria:
- mock_criteria.return_value = self.criteria
- resp = functest_utils.check_success_rate(self.case_name,
- 0)
- self.assertEqual(resp, 'FAIL')
-
# TODO: merge_dicts
def test_get_testcases_file_dir(self):
@@ -586,7 +570,7 @@ class FunctestUtilsTesting(unittest.TestCase):
"functest/ci/testcases.yaml")
def test_get_functest_yaml(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
@@ -601,4 +585,5 @@ class FunctestUtilsTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_openstack_clean.py b/functest/tests/unit/utils/test_openstack_clean.py
index 15669538..fe7b50d4 100644
--- a/functest/tests/unit/utils/test_openstack_clean.py
+++ b/functest/tests/unit/utils/test_openstack_clean.py
@@ -15,8 +15,6 @@ from functest.tests.unit import test_utils
class OSCleanTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def _get_instance(self, key):
mock_obj = mock.Mock()
attrs = {'id': 'id' + str(key), 'name': 'name' + str(key),
@@ -723,4 +721,5 @@ class OSCleanTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_openstack_snapshot.py b/functest/tests/unit/utils/test_openstack_snapshot.py
index 52744db1..d3f93994 100644
--- a/functest/tests/unit/utils/test_openstack_snapshot.py
+++ b/functest/tests/unit/utils/test_openstack_snapshot.py
@@ -14,8 +14,6 @@ from functest.utils import openstack_snapshot
class OSTackerTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def _get_instance(self, key):
mock_obj = mock.Mock()
attrs = {'id': 'id' + str(key), 'name': 'name' + str(key),
@@ -232,4 +230,5 @@ class OSTackerTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_openstack_tacker.py b/functest/tests/unit/utils/test_openstack_tacker.py
index 37d77a18..3c0fc3d0 100644
--- a/functest/tests/unit/utils/test_openstack_tacker.py
+++ b/functest/tests/unit/utils/test_openstack_tacker.py
@@ -17,8 +17,6 @@ from functest.tests.unit import test_utils
class OSTackerTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.tacker_client = mock.Mock()
self.getresponse = {'vnfds': [{'id': 'test_id'}],
@@ -522,4 +520,5 @@ class OSTackerTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py
index a7df264c..15b54057 100644
--- a/functest/tests/unit/utils/test_openstack_utils.py
+++ b/functest/tests/unit/utils/test_openstack_utils.py
@@ -17,8 +17,6 @@ from functest.utils import openstack_utils
class OSUtilsTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def _get_env_cred_dict(self, os_prefix=''):
return {'OS_USERNAME': os_prefix + 'username',
'OS_PASSWORD': os_prefix + 'password',
@@ -1839,4 +1837,5 @@ class OSUtilsTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/vnf/ims/test_clearwater.py b/functest/tests/unit/vnf/ims/test_clearwater.py
index 18bebfdf..bc31c33e 100644
--- a/functest/tests/unit/vnf/ims/test_clearwater.py
+++ b/functest/tests/unit/vnf/ims/test_clearwater.py
@@ -16,8 +16,6 @@ from functest.opnfv_tests.vnf.ims import orchestrator_cloudify
class ClearwaterTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.clearwater = clearwater.Clearwater()
self.orchestrator = orchestrator_cloudify.Orchestrator('test_dir')
@@ -83,4 +81,5 @@ class ClearwaterTesting(unittest.TestCase):
'test_domain')
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/vnf/ims/test_cloudify_ims.py b/functest/tests/unit/vnf/ims/test_cloudify_ims.py
index f47ea865..c3c04e1d 100644
--- a/functest/tests/unit/vnf/ims/test_cloudify_ims.py
+++ b/functest/tests/unit/vnf/ims/test_cloudify_ims.py
@@ -15,8 +15,6 @@ from functest.opnfv_tests.vnf.ims import cloudify_ims
class CloudifyImsTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
'os.makedirs'), \
@@ -491,4 +489,5 @@ class CloudifyImsTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/vnf/ims/test_ims_base.py b/functest/tests/unit/vnf/ims/test_ims_base.py
index e283199c..db5b18d7 100644
--- a/functest/tests/unit/vnf/ims/test_ims_base.py
+++ b/functest/tests/unit/vnf/ims/test_ims_base.py
@@ -15,8 +15,6 @@ from functest.opnfv_tests.vnf.ims import clearwater_ims_base as ims_base
class ClearwaterOnBoardingBaseTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
'os.makedirs'):
@@ -55,4 +53,5 @@ class ClearwaterOnBoardingBaseTesting(unittest.TestCase):
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py b/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py
index bf6d483f..57064664 100644
--- a/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py
+++ b/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py
@@ -16,8 +16,6 @@ from functest.opnfv_tests.vnf.ims import orchestrator_cloudify
class ImsVnfTesting(unittest.TestCase):
- logging.disable(logging.CRITICAL)
-
def setUp(self):
self.orchestrator = orchestrator_cloudify.Orchestrator('test_dir')
self.bp = {'file_name': 'test_file',
@@ -174,4 +172,5 @@ class ImsVnfTesting(unittest.TestCase):
'test_subnet')
if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/utils/config.py b/functest/utils/config.py
index b5b84501..d91f63ac 100755..100644
--- a/functest/utils/config.py
+++ b/functest/utils/config.py
@@ -1,8 +1,11 @@
-import os
+#!/usr/bin/env python
+import os
import yaml
-import env
+import six
+
+from functest.utils import env
class Config(object):
@@ -16,7 +19,7 @@ class Config(object):
self._set_others()
def _parse(self, attr_now, left_parametes):
- for param_n, param_v in left_parametes.iteritems():
+ for param_n, param_v in six.iteritems(left_parametes):
attr_further = self._get_attr_further(attr_now, param_n)
if attr_further:
self.__setattr__(attr_further, param_v)
@@ -32,8 +35,3 @@ class Config(object):
CONF = Config()
-
-if __name__ == "__main__":
- print CONF.vnf_cloudify_ims
- print CONF.vnf_cloudify_ims_tenant_images
- print CONF.vnf_cloudify_ims_tenant_images_centos_7
diff --git a/functest/utils/constants.py b/functest/utils/constants.py
index 2e8eb3f4..75c97c76 100755..100644
--- a/functest/utils/constants.py
+++ b/functest/utils/constants.py
@@ -1,20 +1,17 @@
-import config
-import env
+#!/usr/bin/env python
+
+import six
+
+from functest.utils import config
+from functest.utils import env
class Constants(object):
def __init__(self):
- for attr_n, attr_v in config.CONF.__dict__.iteritems():
+ for attr_n, attr_v in six.iteritems(config.CONF.__dict__):
self.__setattr__(attr_n, attr_v)
- for env_n, env_v in env.ENV.__dict__.iteritems():
+ for env_n, env_v in six.iteritems(env.ENV.__dict__):
self.__setattr__(env_n, env_v)
CONST = Constants()
-
-if __name__ == '__main__':
- print CONST.__dict__
- print CONST.NODE_NAME
- print CONST.vIMS_clearwater_blueprint_url
- print CONST.vIMS_clearwater_blueprint_file_name
- print CONST.vIMS_clearwater_blueprint_name
diff --git a/functest/utils/decorators.py b/functest/utils/decorators.py
index 46ffe35d..73e0a352 100644
--- a/functest/utils/decorators.py
+++ b/functest/utils/decorators.py
@@ -3,18 +3,19 @@
# pylint: disable=missing-docstring
import errno
+import functools
import os
-import urlparse
import mock
import requests.sessions
+from six.moves import urllib
def can_dump_request_to_file(method):
def dump_preparedrequest(request, **kwargs):
# pylint: disable=unused-argument
- parseresult = urlparse.urlparse(request.url)
+ parseresult = urllib.parse.urlparse(request.url)
if parseresult.scheme == "file":
try:
dirname = os.path.dirname(parseresult.path)
@@ -33,7 +34,7 @@ def can_dump_request_to_file(method):
def patch_request(method, url, **kwargs):
with requests.sessions.Session() as session:
- parseresult = urlparse.urlparse(url)
+ parseresult = urllib.parse.urlparse(url)
if parseresult.scheme == "file":
with mock.patch.object(session, 'send',
side_effect=dump_preparedrequest):
@@ -41,6 +42,7 @@ def can_dump_request_to_file(method):
else:
return session.request(method=method, url=url, **kwargs)
+ @functools.wraps(method)
def hook(*args, **kwargs):
with mock.patch('requests.api.request', side_effect=patch_request):
return method(*args, **kwargs)
diff --git a/functest/utils/env.py b/functest/utils/env.py
index 7e4df2ea..c9629e15 100644
--- a/functest/utils/env.py
+++ b/functest/utils/env.py
@@ -1,6 +1,11 @@
+#!/usr/bin/env python
+
import os
import re
+import six
+
+
default_envs = {
'NODE_NAME': 'unknown_pod',
'CI_DEBUG': 'false',
@@ -17,9 +22,9 @@ default_envs = {
class Environment(object):
def __init__(self):
- for k, v in os.environ.iteritems():
+ for k, v in six.iteritems(os.environ):
self.__setattr__(k, v)
- for k, v in default_envs.iteritems():
+ for k, v in six.iteritems(default_envs):
if k not in os.environ:
self.__setattr__(k, v)
self._set_ci_run()
diff --git a/functest/utils/functest_utils.py b/functest/utils/functest_utils.py
index 3e85b9e4..bf30f56e 100644
--- a/functest/utils/functest_utils.py
+++ b/functest/utils/functest_utils.py
@@ -16,11 +16,11 @@ import shutil
import subprocess
import sys
import time
-import urllib2
from datetime import datetime as dt
import dns.resolver
import requests
+from six.moves import urllib
import yaml
from git import Repo
@@ -39,9 +39,9 @@ def check_internet_connectivity(url='http://www.opnfv.org/'):
Check if there is access to the internet
"""
try:
- urllib2.urlopen(url, timeout=5)
+ urllib.request.urlopen(url, timeout=5)
return True
- except urllib2.URLError:
+ except urllib.error.URLError:
return False
@@ -52,8 +52,8 @@ def download_url(url, dest_path):
name = url.rsplit('/')[-1]
dest = dest_path + "/" + name
try:
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
+ response = urllib.request.urlopen(url)
+ except (urllib.error.HTTPError, urllib.error.URLError):
return False
with open(dest, 'wb') as f:
@@ -318,7 +318,7 @@ def execute_command(cmd, info=False, error_msg="",
f.write(line)
else:
line = line.replace('\n', '')
- print line
+ print(line)
sys.stdout.flush()
if output_file:
f.close()
@@ -379,16 +379,6 @@ def get_functest_config(parameter):
return get_parameter_from_yaml(parameter, yaml_)
-def check_success_rate(case_name, result):
- # It should be removed as TestCase tests criteria
- # and result.
- logger.warning('check_success_rate will be removed soon')
- criteria = get_criteria_by_test(case_name)
- if type(criteria) == int and result >= criteria:
- return 'PASS'
- return 'FAIL'
-
-
def merge_dicts(dict1, dict2):
for k in set(dict1.keys()).union(dict2.keys()):
if k in dict1 and k in dict2:
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index 7e00a269..8bd95052 100644
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -286,7 +286,7 @@ def get_instances(nova_client):
try:
instances = nova_client.servers.list(search_opts={'all_tenants': 1})
return instances
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instances(nova_client)]: %s" % e)
return None
@@ -295,7 +295,7 @@ def get_instance_status(nova_client, instance):
try:
instance = nova_client.servers.get(instance.id)
return instance.status
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instance_status(nova_client)]: %s" % e)
return None
@@ -304,7 +304,7 @@ def get_instance_by_name(nova_client, instance_name):
try:
instance = nova_client.servers.find(name=instance_name)
return instance
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s"
% (instance_name, e))
return None
@@ -334,7 +334,7 @@ def get_aggregates(nova_client):
try:
aggregates = nova_client.aggregates.list()
return aggregates
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_aggregates(nova_client)]: %s" % e)
return None
@@ -344,7 +344,7 @@ def get_aggregate_id(nova_client, aggregate_name):
aggregates = get_aggregates(nova_client)
_id = [ag.id for ag in aggregates if ag.name == aggregate_name][0]
return _id
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_aggregate_id(nova_client, %s)]:"
" %s" % (aggregate_name, e))
return None
@@ -354,7 +354,7 @@ def get_availability_zones(nova_client):
try:
availability_zones = nova_client.availability_zones.list()
return availability_zones
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_availability_zones(nova_client)]: %s" % e)
return None
@@ -363,7 +363,7 @@ def get_availability_zone_names(nova_client):
try:
az_names = [az.zoneName for az in get_availability_zones(nova_client)]
return az_names
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_availability_zone_names(nova_client)]:"
" %s" % e)
return None
@@ -381,7 +381,7 @@ def create_flavor(nova_client, flavor_name, ram, disk, vcpus, public=True):
# flavor extra specs are not configured, therefore skip the update
pass
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
"'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
return None
@@ -414,7 +414,7 @@ def get_floating_ips(nova_client):
try:
floating_ips = nova_client.floating_ips.list()
return floating_ips
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_floating_ips(nova_client)]: %s" % e)
return None
@@ -427,7 +427,7 @@ def get_hypervisors(nova_client):
if hypervisor.state == "up":
nodes.append(hypervisor.hypervisor_hostname)
return nodes
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_hypervisors(nova_client)]: %s" % e)
return None
@@ -436,7 +436,7 @@ def create_aggregate(nova_client, aggregate_name, av_zone):
try:
nova_client.aggregates.create(aggregate_name, av_zone)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, av_zone, e))
return None
@@ -447,7 +447,7 @@ def add_host_to_aggregate(nova_client, aggregate_name, compute_host):
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.add_host(aggregate_id, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, compute_host, e))
return None
@@ -459,7 +459,7 @@ def create_aggregate_with_host(
create_aggregate(nova_client, aggregate_name, av_zone)
add_host_to_aggregate(nova_client, aggregate_name, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_aggregate_with_host("
"nova_client, %s, %s, %s)]: %s"
% (aggregate_name, av_zone, compute_host, e))
@@ -552,7 +552,7 @@ def create_floating_ip(neutron_client):
ip_json = neutron_client.create_floatingip({'floatingip': props})
fip_addr = ip_json['floatingip']['floating_ip_address']
fip_id = ip_json['floatingip']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
@@ -562,7 +562,7 @@ def add_floating_ip(nova_client, server_id, floatingip_addr):
try:
nova_client.servers.add_floating_ip(server_id, floatingip_addr)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s"
% (server_id, floatingip_addr, e))
return False
@@ -572,7 +572,7 @@ def delete_instance(nova_client, instance_id):
try:
nova_client.servers.force_delete(instance_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_instance(nova_client, '%s')]: %s"
% (instance_id, e))
return False
@@ -582,7 +582,7 @@ def delete_floating_ip(nova_client, floatingip_id):
try:
nova_client.floating_ips.delete(floatingip_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_floating_ip(nova_client, '%s')]: %s"
% (floatingip_id, e))
return False
@@ -593,7 +593,7 @@ def remove_host_from_aggregate(nova_client, aggregate_name, compute_host):
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.remove_host(aggregate_id, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:"
" %s" % (aggregate_name, compute_host, e))
return False
@@ -612,7 +612,7 @@ def delete_aggregate(nova_client, aggregate_name):
remove_hosts_from_aggregate(nova_client, aggregate_name)
nova_client.aggregates.delete(aggregate_name)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_aggregate(nova_client, %s)]: %s"
% (aggregate_name, e))
return False
@@ -715,7 +715,7 @@ def create_neutron_net(neutron_client, name):
network = neutron_client.create_network(body=json_body)
network_dict = network['network']
return network_dict['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
% (name, e))
return None
@@ -727,7 +727,7 @@ def create_neutron_subnet(neutron_client, name, cidr, net_id):
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
@@ -738,7 +738,7 @@ def create_neutron_router(neutron_client, name):
try:
router = neutron_client.create_router(json_body)
return router['router']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
% (name, e))
return None
@@ -754,7 +754,7 @@ def create_neutron_port(neutron_client, name, network_id, ip):
try:
port = neutron_client.create_port(body=json_body)
return port['port']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
@@ -765,7 +765,7 @@ def update_neutron_net(neutron_client, network_id, shared=False):
try:
neutron_client.update_network(network_id, body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
@@ -779,7 +779,7 @@ def update_neutron_port(neutron_client, port_id, device_owner):
port = neutron_client.update_port(port=port_id,
body=json_body)
return port['port']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
@@ -790,7 +790,7 @@ def add_interface_router(neutron_client, router_id, subnet_id):
try:
neutron_client.add_interface_router(router=router_id, body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
@@ -802,7 +802,7 @@ def add_gateway_router(neutron_client, router_id):
try:
neutron_client.add_gateway_router(router_id, router_dict)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -812,7 +812,7 @@ def delete_neutron_net(neutron_client, network_id):
try:
neutron_client.delete_network(network_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
% (network_id, e))
return False
@@ -822,7 +822,7 @@ def delete_neutron_subnet(neutron_client, subnet_id):
try:
neutron_client.delete_subnet(subnet_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
% (subnet_id, e))
return False
@@ -832,7 +832,7 @@ def delete_neutron_router(neutron_client, router_id):
try:
neutron_client.delete_router(router=router_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -842,7 +842,7 @@ def delete_neutron_port(neutron_client, port_id):
try:
neutron_client.delete_port(port_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
% (port_id, e))
return False
@@ -854,7 +854,7 @@ def remove_interface_router(neutron_client, router_id, subnet_id):
neutron_client.remove_interface_router(router=router_id,
body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
@@ -864,7 +864,7 @@ def remove_gateway_router(neutron_client, router_id):
try:
neutron_client.remove_gateway_router(router_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -994,7 +994,7 @@ def get_security_groups(neutron_client):
security_groups = neutron_client.list_security_groups()[
'security_groups']
return security_groups
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
return None
@@ -1015,7 +1015,7 @@ def create_security_group(neutron_client, sg_name, sg_description):
try:
secgroup = neutron_client.create_security_group(json_body)
return secgroup['security_group']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_security_group(neutron_client, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
@@ -1070,7 +1070,7 @@ def get_security_group_rules(neutron_client, sg_id):
security_rules = [rule for rule in security_rules
if rule["security_group_id"] == sg_id]
return security_rules
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
" %s" % e)
return None
@@ -1089,7 +1089,7 @@ def check_security_group_rules(neutron_client, sg_id, direction, protocol,
return True
else:
return False
- except Exception, e:
+ except Exception as e:
logger.error("Error [check_security_group_rules("
" neutron_client, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
@@ -1141,7 +1141,7 @@ def add_secgroup_to_instance(nova_client, instance_id, secgroup_id):
try:
nova_client.servers.add_security_group(instance_id, secgroup_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_secgroup_to_instance(nova_client, '%s', "
"'%s')]: %s" % (instance_id, secgroup_id, e))
return False
@@ -1157,7 +1157,7 @@ def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
neutron_client.update_quota(tenant_id=tenant_id,
body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
@@ -1167,7 +1167,7 @@ def delete_security_group(neutron_client, secgroup_id):
try:
neutron_client.delete_security_group(secgroup_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
% (secgroup_id, e))
return False
@@ -1180,7 +1180,7 @@ def get_images(nova_client):
try:
images = nova_client.images.list()
return images
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_images]: %s" % e)
return None
@@ -1216,7 +1216,7 @@ def create_glance_image(glance_client, image_name, file_path, disk="qcow2",
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return image_id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
"'%s')]: %s" % (image_name, file_path, public, e))
return None
@@ -1246,7 +1246,7 @@ def delete_glance_image(nova_client, image_id):
try:
nova_client.images.delete(image_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_glance_image(nova_client, '%s')]: %s"
% (image_id, e))
return False
@@ -1259,7 +1259,7 @@ def get_volumes(cinder_client):
try:
volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
return volumes
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_volumes(cinder_client)]: %s" % e)
return None
@@ -1272,7 +1272,7 @@ def list_volume_types(cinder_client, public=True, private=True):
if not private:
volume_types = [vt for vt in volume_types if vt.is_public]
return volume_types
- except Exception, e:
+ except Exception as e:
logger.error("Error [list_volume_types(cinder_client)]: %s" % e)
return None
@@ -1281,7 +1281,7 @@ def create_volume_type(cinder_client, name):
try:
volume_type = cinder_client.volume_types.create(name)
return volume_type
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_volume_type(cinder_client, '%s')]: %s"
% (name, e))
return None
@@ -1296,7 +1296,7 @@ def update_cinder_quota(cinder_client, tenant_id, vols_quota,
try:
cinder_client.quotas.update(tenant_id, **quotas_values)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
"'%s' '%s')]: %s" % (tenant_id, vols_quota,
snapshots_quota, gigabytes_quota, e))
@@ -1314,7 +1314,7 @@ def delete_volume(cinder_client, volume_id, forced=False):
else:
cinder_client.volumes.delete(volume_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
% (volume_id, str(forced), e))
return False
@@ -1324,7 +1324,7 @@ def delete_volume_type(cinder_client, volume_type):
try:
cinder_client.volume_types.delete(volume_type)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_volume_type(cinder_client, '%s')]: %s"
% (volume_type, e))
return False
@@ -1340,7 +1340,7 @@ def get_tenants(keystone_client):
else:
tenants = keystone_client.tenants.list()
return tenants
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_tenants(keystone_client)]: %s" % e)
return None
@@ -1349,7 +1349,7 @@ def get_users(keystone_client):
try:
users = keystone_client.users.list()
return users
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_users(keystone_client)]: %s" % e)
return None
@@ -1397,7 +1397,7 @@ def create_tenant(keystone_client, tenant_name, tenant_description):
tenant_description,
enabled=True)
return tenant.id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
% (tenant_name, tenant_description, e))
return None
@@ -1428,7 +1428,7 @@ def create_user(keystone_client, user_name, user_password,
tenant_id,
enabled=True)
return user.id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
"'%s')]: %s" % (user_name, user_password,
user_email, tenant_id, e))
@@ -1453,7 +1453,7 @@ def add_role_user(keystone_client, user_id, role_id, tenant_id):
else:
keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_role_user(keystone_client, '%s', '%s'"
"'%s')]: %s " % (user_id, role_id, tenant_id, e))
return False
@@ -1466,7 +1466,7 @@ def delete_tenant(keystone_client, tenant_id):
else:
keystone_client.tenants.delete(tenant_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_tenant(keystone_client, '%s')]: %s"
% (tenant_id, e))
return False
@@ -1476,7 +1476,7 @@ def delete_user(keystone_client, user_id):
try:
keystone_client.users.delete(user_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_user(keystone_client, '%s')]: %s"
% (user_id, e))
return False
@@ -1489,6 +1489,6 @@ def get_resource(heat_client, stack_id, resource):
try:
resources = heat_client.resources.get(stack_id, resource)
return resources
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_resource]: %s" % e)
return None