summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--functest/cli/commands/cli_env.py11
-rw-r--r--functest/core/unit.py7
-rw-r--r--functest/core/vnf.py334
-rwxr-xr-xfunctest/opnfv_tests/vnf/aaa/aaa.py53
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater_ims_base.py18
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py116
-rw-r--r--functest/opnfv_tests/vnf/ims/opera_ims.py8
-rwxr-xr-xfunctest/opnfv_tests/vnf/ims/orchestra_ims.py188
-rw-r--r--functest/tests/unit/cli/commands/test_cli_env.py14
-rw-r--r--functest/tests/unit/core/test_unit.py12
-rw-r--r--functest/tests/unit/core/test_vnf.py263
-rw-r--r--functest/tests/unit/utils/test_functest_utils.py27
-rw-r--r--functest/tests/unit/utils/test_openstack_utils.py71
-rw-r--r--functest/tests/unit/vnf/ims/test_cloudify_ims.py31
-rw-r--r--functest/tests/unit/vnf/ims/test_ims_base.py17
-rw-r--r--functest/utils/env.py5
-rw-r--r--functest/utils/functest_utils.py10
-rw-r--r--functest/utils/openstack_utils.py68
-rw-r--r--requirements.txt52
-rw-r--r--setup.py17
-rw-r--r--test-requirements.txt17
-rw-r--r--tox.ini8
22 files changed, 650 insertions, 697 deletions
diff --git a/functest/cli/commands/cli_env.py b/functest/cli/commands/cli_env.py
index b43116fcd..8094c84e5 100644
--- a/functest/cli/commands/cli_env.py
+++ b/functest/cli/commands/cli_env.py
@@ -10,7 +10,6 @@
import os
import click
-import git
import prettytable
from functest.utils.constants import CONST
@@ -49,14 +48,6 @@ class CliEnv(object):
installer_info = ("%s, %s" % (install_type, installer_ip))
scenario = _get_value(CONST.__getattribute__('DEPLOY_SCENARIO'))
node = _get_value(CONST.__getattribute__('NODE_NAME'))
- repo_h = git.Repo(CONST.__getattribute__('dir_repo_functest')).head
- if repo_h.is_detached:
- git_branch = 'detached from FETCH_HEAD'
- git_hash = repo_h.commit.hexsha
- else:
- branch = repo_h.reference
- git_branch = branch.name
- git_hash = branch.commit.hexsha
is_debug = _get_value(CONST.__getattribute__('CI_DEBUG'), 'false')
build_tag = CONST.__getattribute__('BUILD_TAG')
if build_tag is not None:
@@ -73,8 +64,6 @@ class CliEnv(object):
msg.add_row(['INSTALLER', installer_info])
msg.add_row(['SCENARIO', scenario])
msg.add_row(['POD', node])
- msg.add_row(['GIT BRANCH', git_branch])
- msg.add_row(['GIT HASH', git_hash])
if build_tag:
msg.add_row(['BUILD TAG', build_tag])
msg.add_row(['DEBUG FLAG', is_debug])
diff --git a/functest/core/unit.py b/functest/core/unit.py
index 515a20806..61b5a58d9 100644
--- a/functest/core/unit.py
+++ b/functest/core/unit.py
@@ -74,8 +74,11 @@ class Suite(testcase.TestCase):
stream=stream, verbosity=2).run(self.suite)
self.__logger.debug("\n\n%s", stream.getvalue())
self.stop_time = time.time()
- self.details = {"failures": result.failures,
- "errors": result.errors}
+ self.details = {
+ "testsRun": result.testsRun,
+ "failures": len(result.failures),
+ "errors": len(result.errors),
+ "stream": stream.getvalue()}
self.result = 100 * (
(result.testsRun - (len(result.failures) +
len(result.errors))) /
diff --git a/functest/core/vnf.py b/functest/core/vnf.py
index 5667b2997..0589b5d2a 100644
--- a/functest/core/vnf.py
+++ b/functest/core/vnf.py
@@ -7,220 +7,182 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-import inspect
+"""Define the parent class of all VNF TestCases."""
+
import logging
import time
import functest.core.testcase as base
from functest.utils.constants import CONST
-import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
+__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
+ "Valentin Boucher <valentin.boucher@orange.com>")
+
+
+class VnfPreparationException(Exception):
+ """Raise when VNF preparation cannot be executed."""
+
+
+class OrchestratorDeploymentException(Exception):
+ """Raise when orchestrator cannot be deployed."""
+
+
+class VnfDeploymentException(Exception):
+ """Raise when VNF cannot be deployed."""
+
+
+class VnfTestException(Exception):
+ """Raise when VNF cannot be tested."""
+
class VnfOnBoarding(base.TestCase):
+ """Base model for VNF test cases."""
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
super(VnfOnBoarding, self).__init__(**kwargs)
- self.repo = kwargs.get('repo', '')
- self.cmd = kwargs.get('cmd', '')
- self.details = {}
- self.result_dir = CONST.__getattribute__('dir_results')
- self.details_step_mapping = dict(
- deploy_orchestrator='orchestrator',
- deploy_vnf='vnf',
- test_vnf='test_vnf',
- prepare='prepare_env')
- self.details['prepare_env'] = {}
- self.details['orchestrator'] = {}
- self.details['vnf'] = {}
- self.details['test_vnf'] = {}
- self.images = {}
- try:
- self.tenant_name = CONST.__getattribute__(
- 'vnf_{}_tenant_name'.format(self.case_name))
- self.tenant_description = CONST.__getattribute__(
- 'vnf_{}_tenant_description'.format(self.case_name))
- except Exception:
- # raise Exception("Unknown VNF case=" + self.case_name)
- self.__logger.error("Unknown VNF case={}".format(self.case_name))
-
- try:
- self.images = CONST.__getattribute__(
- 'vnf_{}_tenant_images'.format(self.case_name))
- except Exception:
- self.__logger.warn("No tenant image defined for this VNF")
-
- def execute(self):
+ self.tenant_created = False
+ self.user_created = False
+ self.tenant_name = CONST.__getattribute__(
+ 'vnf_{}_tenant_name'.format(self.case_name))
+ self.tenant_description = CONST.__getattribute__(
+ 'vnf_{}_tenant_description'.format(self.case_name))
+
+ def run(self, **kwargs):
+ """
+ Run of the VNF test case:
+
+ * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP),
+ * Deploy the VNF,
+ * Perform tests on the VNF
+
+ A VNF test case is successfull when the 3 steps are PASS
+ If one of the step is FAIL, the test case is FAIL
+
+ Returns:
+ TestCase.EX_OK if result is 'PASS'.
+ TestCase.EX_TESTCASE_FAILED otherwise.
+ """
self.start_time = time.time()
- # Prepare the test (Create Tenant, User, ...)
+
try:
- self.__logger.info("Create VNF Onboarding environment")
self.prepare()
- except Exception:
- self.__logger.error("Error during VNF Onboarding environment"
- "creation", exc_info=True)
+ if (self.deploy_orchestrator() and
+ self.deploy_vnf() and
+ self.test_vnf()):
+ self.stop_time = time.time()
+ # Calculation with different weight depending on the steps TODO
+ self.result = 100
+ return base.TestCase.EX_OK
+ else:
+ self.result = 0
+ return base.TestCase.EX_TESTCASE_FAILED
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Exception on VNF testing")
return base.TestCase.EX_TESTCASE_FAILED
- # Deploy orchestrator
- try:
- self.__logger.info("Deploy orchestrator (if necessary)")
- orchestrator_ready_time = time.time()
- res_orchestrator = self.deploy_orchestrator()
- # orchestrator is not mandatory
- if res_orchestrator is not None:
- self.details['orchestrator']['status'] = (
- res_orchestrator['status'])
- self.details['orchestrator']['result'] = (
- res_orchestrator['result'])
- self.details['orchestrator']['duration'] = round(
- orchestrator_ready_time - self.start_time, 1)
- except Exception:
- self.__logger.warn("Problem with the Orchestrator", exc_info=True)
-
- # Deploy VNF
- try:
- self.__logger.info("Deploy VNF " + self.case_name)
- res_deploy_vnf = self.deploy_vnf()
- vnf_ready_time = time.time()
- self.details['vnf']['status'] = res_deploy_vnf['status']
- self.details['vnf']['result'] = res_deploy_vnf['result']
- self.details['vnf']['duration'] = round(
- vnf_ready_time - orchestrator_ready_time, 1)
- except Exception:
- self.__logger.error("Error during VNF deployment", exc_info=True)
- return base.TestCase.EX_TESTCASE_FAILED
+ def prepare(self):
+ """
+ Prepare the environment for VNF testing:
- # Test VNF
- try:
- self.__logger.info("Test VNF")
- res_test_vnf = self.test_vnf()
- test_vnf_done_time = time.time()
- self.details['test_vnf']['status'] = res_test_vnf['status']
- self.details['test_vnf']['result'] = res_test_vnf['result']
- self.details['test_vnf']['duration'] = round(
- test_vnf_done_time - vnf_ready_time, 1)
- except Exception:
- self.__logger.error("Error when running VNF tests", exc_info=True)
- return base.TestCase.EX_TESTCASE_FAILED
+ * Creation of a user,
+ * Creation of a tenant,
+ * Allocation admin role to the user on this tenant
- # Clean the system
- self.clean()
- self.stop_time = time.time()
+ Returns base.TestCase.EX_OK if preparation is successfull
- exit_code = self.parse_results()
- self.log_results()
- return exit_code
+ Raise VnfPreparationException in case of problem
+ """
+ try:
+ self.__logger.info("Prepare VNF: %s, description: %s",
+ self.tenant_name, self.tenant_description)
+ admin_creds = os_utils.get_credentials()
+ keystone_client = os_utils.get_keystone_client()
+ self.tenant_created = os_utils.get_or_create_tenant_for_vnf(
+ keystone_client, self.tenant_name, self.tenant_description)
+ self.user_created = os_utils.get_or_create_user_for_vnf(
+ keystone_client, self.tenant_name)
+ creds = admin_creds.copy()
+ creds.update({
+ "tenant": self.tenant_name,
+ "username": self.tenant_name,
+ "password": self.tenant_name
+ })
+ return base.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Exception raised during VNF preparation")
+ raise VnfPreparationException
+
+ def deploy_orchestrator(self):
+ """
+ Deploy an orchestrator (optional).
+
+ If function overwritten
+ raise orchestratorDeploymentException if error during orchestrator
+ deployment
+ """
+ self.__logger.info("Deploy orchestrator (if necessary)")
+ return True
- # prepare state could consist in the creation of the resources
- # a dedicated user
- # a dedicated tenant
- # dedicated images
- def prepare(self):
- self.creds = os_utils.get_credentials()
- self.keystone_client = os_utils.get_keystone_client()
-
- self.__logger.info(
- "Prepare OpenStack plateform(create tenant and user)")
- admin_user_id = os_utils.get_user_id(self.keystone_client,
- self.creds['username'])
- if not admin_user_id:
- self.step_failure("Failed to get id of {0}".format(
- self.creds['username']))
-
- tenant_id = os_utils.get_tenant_id(self.keystone_client,
- self.tenant_name)
- if not tenant_id:
- tenant_id = os_utils.create_tenant(self.keystone_client,
- self.tenant_name,
- self.tenant_description)
- if not tenant_id:
- self.step_failure("Failed to get or create {0} tenant".format(
- self.tenant_name))
- roles_name = ["admin", "Admin"]
- role_id = ''
- for role_name in roles_name:
- if not role_id:
- role_id = os_utils.get_role_id(self.keystone_client,
- role_name)
-
- if not role_id:
- self.step_failure("Failed to get id for {0} role".format(
- role_name))
-
- if not os_utils.add_role_user(self.keystone_client, admin_user_id,
- role_id, tenant_id):
- self.step_failure("Failed to add {0} on tenant".format(
- self.creds['username']))
-
- user_id = os_utils.get_or_create_user(self.keystone_client,
- self.tenant_name,
- self.tenant_name,
- tenant_id)
- if not user_id:
- self.step_failure("Failed to get or create {0} user".format(
- self.tenant_name))
-
- os_utils.add_role_user(self.keystone_client, user_id,
- role_id, tenant_id)
-
- self.__logger.info("Update OpenStack creds informations")
- self.admin_creds = self.creds.copy()
- self.admin_creds.update({
- "tenant": self.tenant_name
- })
- self.neutron_client = os_utils.get_neutron_client(self.admin_creds)
- self.nova_client = os_utils.get_nova_client(self.admin_creds)
- self.creds.update({
- "tenant": self.tenant_name,
- "username": self.tenant_name,
- "password": self.tenant_name,
- })
-
- # orchestrator is not mandatory to deploy and test VNF
- def deploy_orchestrator(self, **kwargs):
- pass
-
- # TODO see how to use built-in exception from releng module
def deploy_vnf(self):
+ """
+ Deploy the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ The deployment can be executed via a specific orchestrator
+ or using nuild-in orchestrators such as:
+
+ * heat, openbaton, cloudify (available on all scenario),
+ * open-o (on open-o scenarios)
+
+ Returns:
+ True if the VNF is properly deployed
+ False if the VNF is not deployed
+
+ Raise VnfDeploymentException if error during VNF deployment
+ """
self.__logger.error("VNF must be deployed")
- raise Exception("VNF not deployed")
+ raise VnfDeploymentException
def test_vnf(self):
+ """
+ Test the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ Once a VNF is deployed, it is assumed that specific test suite can be
+ run to validate the VNF.
+ Please note that the same test suite can be used on several test case
+ (e.g. clearwater test suite can be used whatever the orchestrator used
+ for the deployment)
+
+ Returns:
+ True if VNF tests are PASS
+ False if test suite is FAIL
+
+ Raise VnfTestException if error during VNF test
+ """
self.__logger.error("VNF must be tested")
- raise Exception("VNF not tested")
+ raise VnfTestException
- # clean before openstack clean run
def clean(self):
- self.__logger.info("test cleaning")
+ """
+ Clean VNF test case.
- def parse_results(self):
- exit_code = self.EX_OK
- self.result = "PASS"
- self.__logger.info(self.details)
- # The 2 VNF steps must be OK to get a PASS result
- if (self.details['vnf']['status'] is not "PASS" or
- self.details['test_vnf']['status'] is not "PASS"):
- exit_code = self.EX_RUN_ERROR
- self.result = "FAIL"
- return exit_code
-
- def log_results(self):
- ft_utils.logger_test_results(self.project_name,
- self.case_name,
- self.result,
- self.details)
-
- def step_failure(self, error_msg):
- part = inspect.stack()[1][3]
- self.__logger.error("Step {0} failed: {1}".format(part, error_msg))
- try:
- step_name = self.details_step_mapping[part]
- part_info = self.details[step_name]
- except KeyError:
- self.details[part] = {}
- part_info = self.details[part]
- part_info['status'] = 'FAIL'
- part_info['result'] = error_msg
- raise Exception(error_msg)
+ It is up to the test providers to delete resources used for the tests.
+ By default we clean:
+
+ * the user,
+ * the tenant
+ """
+ self.__logger.info("test cleaning")
+ keystone_client = os_utils.get_keystone_client()
+ if self.tenant_created:
+ os_utils.delete_tenant(keystone_client, self.tenant_name)
+ if self.user_created:
+ os_utils.delete_user(keystone_client, self.tenant_name)
diff --git a/functest/opnfv_tests/vnf/aaa/aaa.py b/functest/opnfv_tests/vnf/aaa/aaa.py
index 0030256c2..71e3c972a 100755
--- a/functest/opnfv_tests/vnf/aaa/aaa.py
+++ b/functest/opnfv_tests/vnf/aaa/aaa.py
@@ -8,15 +8,12 @@
# http://www.apache.org/licenses/LICENSE-2.0
import logging
-import sys
-import argparse
-
-import functest.core.testcase as testcase
import functest.core.vnf as vnf
class AaaVnf(vnf.VnfOnBoarding):
+ """AAA VNF sample"""
logger = logging.getLogger(__name__)
@@ -27,48 +24,18 @@ class AaaVnf(vnf.VnfOnBoarding):
def deploy_orchestrator(self):
self.logger.info("No VNFM needed to deploy a free radius here")
- return None
+ return True
-# TODO see how to use build in exception form releng module
def deploy_vnf(self):
self.logger.info("Freeradius VNF deployment")
- # TODO apt-get update + config tuning
- deploy_vnf = {}
- deploy_vnf['status'] = "PASS"
- deploy_vnf['result'] = {}
- return deploy_vnf
+ # find a way to deploy freeradius and tester (heat,manual, ..)
+ deploy_vnf = {'status': 'PASS', 'version': 'xxxx'}
+ self.details['deploy_vnf'] = deploy_vnf
+ return True
def test_vnf(self):
self.logger.info("Run test towards freeradius")
- # TODO: once the freeradius is deployed..make some tests
- test_vnf = {}
- test_vnf['status'] = "PASS"
- test_vnf['result'] = {}
- return test_vnf
-
- def main(self, **kwargs):
- self.logger.info("AAA VNF onboarding")
- self.execute()
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- logging.basicConfig()
- parser = argparse.ArgumentParser()
- args = vars(parser.parse_args())
- aaa_vnf = AaaVnf()
- try:
- result = aaa_vnf.main(**args)
- if result != testcase.TestCase.EX_OK:
- sys.exit(result)
- if args['pushtodb']:
- sys.exit(aaa_vnf.push_to_db())
- except Exception:
- sys.exit(testcase.TestCase.EX_RUN_ERROR)
+ # once the freeradius is deployed..make some tests
+ test_vnf = {'status': 'PASS', 'version': 'xxxx'}
+ self.details['test_vnf'] = test_vnf
+ return True
diff --git a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
index 42d31e3bb..a645dfb2f 100644
--- a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
+++ b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
@@ -17,16 +17,24 @@ import functest.core.vnf as vnf
from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
+__author__ = ("Valentin Boucher <valentin.boucher@orange.com>, "
+ "Helen Yao <helanyao@gmail.com>")
+
class ClearwaterOnBoardingBase(vnf.VnfOnBoarding):
+ """ vIMS clearwater base usable by several orchestrators"""
def __init__(self, **kwargs):
self.logger = logging.getLogger(__name__)
super(ClearwaterOnBoardingBase, self).__init__(**kwargs)
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf', 'ims')
- self.data_dir = CONST.dir_ims_data
- self.result_dir = os.path.join(CONST.dir_results, self.case_name)
- self.test_dir = CONST.dir_repo_vims_test
+ self.case_dir = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ 'vnf',
+ 'ims')
+ self.data_dir = CONST.__getattribute__('dir_ims_data')
+ self.result_dir = os.path.join(CONST.__getattribute__('dir_results'),
+ self.case_name)
+ self.test_dir = CONST.__getattribute__('dir_repo_vims_test')
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
@@ -58,7 +66,7 @@ class ClearwaterOnBoardingBase(vnf.VnfOnBoarding):
if rq.status_code != 201:
raise Exception('Failed to get cookie for Ellis')
cookies = rq.cookies
- self.logger.info('Cookies: %s' % cookies)
+ self.logger.info('Cookies: %s', cookies)
number_url = 'http://{0}/accounts/{1}/numbers'.format(
ellis_ip,
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index ba4c57901..89c84afdd 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -22,14 +22,21 @@ from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
+__author__ = "Valentin Boucher <valentin.boucher@orange.com>"
+
class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
+ """Clearwater vIMS deployed with Cloudify Orchestrator Case"""
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
super(CloudifyIms, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
+ self.neutron_client = ''
+ self.glance_client = ''
+ self.keystone_client = ''
+ self.nova_client = ''
# Retrieve the configuration
try:
@@ -44,7 +51,7 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
blueprint=get_config("cloudify.blueprint", config_file),
inputs=get_config("cloudify.inputs", config_file)
)
- self.logger.debug("Orchestrator configuration: %s" % self.orchestrator)
+ self.logger.debug("Orchestrator configuration: %s", self.orchestrator)
self.vnf = dict(
blueprint=get_config("clearwater.blueprint", config_file),
deployment_name=get_config("clearwater.deployment_name",
@@ -52,12 +59,12 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
inputs=get_config("clearwater.inputs", config_file),
requirements=get_config("clearwater.requirements", config_file)
)
- self.logger.debug("VNF configuration: %s" % self.vnf)
+ self.logger.debug("VNF configuration: %s", self.vnf)
self.images = get_config("tenant_images", config_file)
- self.logger.info("Images needed for vIMS: %s" % self.images)
+ self.logger.info("Images needed for vIMS: %s", self.images)
- def deploy_orchestrator(self, **kwargs):
+ def deploy_orchestrator(self):
self.logger.info("Additional pre-configuration steps")
self.neutron_client = os_utils.get_neutron_client(self.admin_creds)
@@ -69,45 +76,45 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
self.logger.info("Upload some OS images if it doesn't exist")
temp_dir = os.path.join(self.data_dir, "tmp/")
for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s" % (image_name, image_url))
+ self.logger.info("image: %s, url: %s", image_name, image_url)
try:
image_id = os_utils.get_image_id(self.glance_client,
image_name)
- self.logger.debug("image_id: %s" % image_id)
+ self.logger.debug("image_id: %s", image_id)
except Exception:
- self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
+ self.logger.error("Unexpected error: %s", sys.exc_info()[0])
if image_id == '':
- self.logger.info("""%s image doesn't exist on glance repository. Try
- downloading this image and upload on glance !""" % image_name)
- image_id = download_and_add_image_on_glance(self.glance_client,
- image_name,
- image_url,
- temp_dir)
- if image_id == '':
- self.step_failure(
- "Failed to find or upload required OS "
- "image for this deployment")
+ self.logger.info("""%s image does not exist on glance repo.
+ Try downloading this image
+ and upload on glance !""",
+ image_name)
+ image_id = os_utils.download_and_add_image_on_glance(
+ self.glance_client,
+ image_name,
+ image_url,
+ temp_dir)
# Need to extend quota
self.logger.info("Update security group quota for this tenant")
tenant_id = os_utils.get_tenant_id(self.keystone_client,
self.tenant_name)
- self.logger.debug("Tenant id found %s" % tenant_id)
+ self.logger.debug("Tenant id found %s", tenant_id)
if not os_utils.update_sg_quota(self.neutron_client,
tenant_id, 50, 100):
- self.step_failure("Failed to update security group quota" +
+ self.logger.error("Failed to update security group quota"
" for tenant " + self.tenant_name)
+
self.logger.debug("group quota extended")
# start the deployment of cloudify
public_auth_url = os_utils.get_endpoint('identity')
- self.logger.debug("CFY inputs: %s" % self.orchestrator['inputs'])
+ self.logger.debug("CFY inputs: %s", self.orchestrator['inputs'])
cfy = Orchestrator(self.data_dir, self.orchestrator['inputs'])
self.orchestrator['object'] = cfy
self.logger.debug("Orchestrator object created")
- self.logger.debug("Tenant name: %s" % self.tenant_name)
+ self.logger.debug("Tenant name: %s", self.tenant_name)
cfy.set_credentials(username=self.tenant_name,
password=self.tenant_name,
@@ -117,7 +124,7 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
# orchestrator VM flavor
self.logger.info("Check Flavor is available, if not create one")
- self.logger.debug("Flavor details %s " %
+ self.logger.debug("Flavor details %s ",
self.orchestrator['requirements']['ram_min'])
flavor_exist, flavor_id = os_utils.get_or_create_flavor(
"m1.large",
@@ -125,13 +132,12 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
'50',
'2',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
if not flavor_id:
self.logger.info("Available flavors are: ")
self.logger.info(self.nova_client.flavor.list())
- self.step_failure("Failed to find required flavor"
- "for this deployment")
+
cfy.set_flavor_id(flavor_id)
self.logger.debug("Flavor OK")
@@ -141,23 +147,16 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
image_id = os_utils.get_image_id(
self.glance_client,
self.orchestrator['requirements']['os_image'])
- self.logger.debug("Orchestrator image id: %s" % image_id)
+ self.logger.debug("Orchestrator image id: %s", image_id)
if image_id == '':
self.logger.error("CFY image not found")
- self.step_failure("Failed to find required OS image"
- " for cloudify manager")
- else:
- self.step_failure("Failed to find required OS image"
- " for cloudify manager")
cfy.set_image_id(image_id)
self.logger.debug("Orchestrator image set")
self.logger.debug("Get External network")
ext_net = os_utils.get_external_net(self.neutron_client)
- self.logger.debug("External network: %s" % ext_net)
- if not ext_net:
- self.step_failure("Failed to get external network")
+ self.logger.debug("External network: %s", ext_net)
cfy.set_external_network_name(ext_net)
self.logger.debug("CFY External network set")
@@ -199,12 +198,10 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
'30',
'1',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
if not flavor_id:
self.logger.info("Available flavors are: ")
self.logger.info(self.nova_client.flavor.list())
- self.step_failure("Failed to find required flavor"
- " for this deployment")
cw.set_flavor_id(flavor_id)
@@ -212,19 +209,9 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
if 'os_image' in self.vnf['requirements'].keys():
image_id = os_utils.get_image_id(
self.glance_client, self.vnf['requirements']['os_image'])
- if image_id == '':
- self.step_failure("Failed to find required OS image"
- " for clearwater VMs")
- else:
- self.step_failure("Failed to find required OS image"
- " for clearwater VMs")
cw.set_image_id(image_id)
-
ext_net = os_utils.get_external_net(self.neutron_client)
- if not ext_net:
- self.step_failure("Failed to get external network")
-
cw.set_external_network_name(ext_net)
error = cw.deploy_vnf(self.vnf['blueprint'])
@@ -245,8 +232,8 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
mgr_ip = os.popen(cmd).read()
mgr_ip = mgr_ip.splitlines()[0]
except Exception:
- self.step_failure("Unable to retrieve the IP of the "
- "cloudify manager server !")
+ self.logger.exception("Unable to retrieve the IP of the "
+ "cloudify manager server !")
self.logger.info('Cloudify Manager: %s', mgr_ip)
api_url = 'http://{0}/api/v2/deployments/{1}/outputs'.format(
@@ -262,7 +249,7 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
if dns_ip != "":
vims_test_result = self.run_clearwater_live_test(
dns_ip=dns_ip,
- public_domain=self.inputs["public_domain"])
+ public_domain="") # self.inputs["public_domain"]
if vims_test_result != '':
return {'status': 'PASS', 'result': vims_test_result}
else:
@@ -273,19 +260,6 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
self.orchestrator['object'].undeploy_manager()
super(CloudifyIms, self).clean()
- def main(self, **kwargs):
- self.logger.info("Cloudify IMS VNF onboarding test starting")
- self.execute()
- self.logger.info("Cloudify IMS VNF onboarding test executed")
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
# ----------------------------------------------------------
#
@@ -308,19 +282,3 @@ def get_config(parameter, file):
raise ValueError("The parameter %s is not defined in"
" reporting.yaml" % parameter)
return value
-
-
-def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
-
- image = os_utils.create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
-
- return image
diff --git a/functest/opnfv_tests/vnf/ims/opera_ims.py b/functest/opnfv_tests/vnf/ims/opera_ims.py
index 8c33d16e8..d420705aa 100644
--- a/functest/opnfv_tests/vnf/ims/opera_ims.py
+++ b/functest/opnfv_tests/vnf/ims/opera_ims.py
@@ -16,6 +16,7 @@ from opera import openo_connect
import requests
import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
+from functest.utils.constants import CONST
class OperaIms(clearwater_ims_base.ClearwaterOnBoardingBase):
@@ -25,9 +26,10 @@ class OperaIms(clearwater_ims_base.ClearwaterOnBoardingBase):
kwargs["case_name"] = "opera_ims"
super(OperaIms, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
- self.ellis_file = os.path.join(self.result_dir, 'ellis.info')
- self.live_test_file = os.path.join(self.result_dir,
- 'live_test_report.json')
+ self.ellis_file = os.path.join(
+ CONST.__getattribute__('dir_results'), 'ellis.info')
+ self.live_test_file = os.path.join(
+ CONST.__getattribute__('dir_results'), 'live_test_report.json')
try:
self.openo_msb_endpoint = os.environ['OPENO_MSB_ENDPOINT']
except KeyError:
diff --git a/functest/opnfv_tests/vnf/ims/orchestra_ims.py b/functest/opnfv_tests/vnf/ims/orchestra_ims.py
index 6f341970d..a5d2a9222 100755
--- a/functest/opnfv_tests/vnf/ims/orchestra_ims.py
+++ b/functest/opnfv_tests/vnf/ims/orchestra_ims.py
@@ -16,13 +16,14 @@ import time
import yaml
import functest.core.vnf as vnf
-import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
from functest.utils.constants import CONST
from org.openbaton.cli.agents.agents import MainAgent
from org.openbaton.cli.errors.errors import NfvoException
+
+__author__ = "Pauls, Michael <michael.pauls@fokus.fraunhofer.de>"
# ----------------------------------------------------------
#
# UTILS
@@ -30,7 +31,7 @@ from org.openbaton.cli.errors.errors import NfvoException
# -----------------------------------------------------------
-def get_config(parameter, file):
+def get_config(parameter, my_file):
"""
Returns the value of a given parameter in file.yaml
parameter must be given in string format with dots
@@ -44,25 +45,10 @@ def get_config(parameter, file):
value = value.get(element)
if value is None:
raise ValueError("The parameter %s is not defined in"
- " %s" % (parameter, file))
+ " %s" % (parameter, my_file))
return value
-def download_and_add_image_on_glance(glance, image_name,
- image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
- image = os_utils.create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
- return image
-
-
def servertest(host, port):
args = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
for family, socktype, proto, canonname, sockaddr in args:
@@ -77,20 +63,24 @@ def servertest(host, port):
class ImsVnf(vnf.VnfOnBoarding):
+ """OpenIMS VNF deployed with openBaton orchestrator"""
def __init__(self, project='functest', case_name='orchestra_ims',
repo='', cmd=''):
super(ImsVnf, self).__init__(project, case_name, repo, cmd)
+ self.logger = logging.getLogger(__name__)
+ self.logger.info("Orchestra IMS VNF onboarding test starting")
self.ob_password = "openbaton"
self.ob_username = "admin"
self.ob_https = False
self.ob_port = "8080"
self.ob_ip = "localhost"
self.ob_instance_id = ""
- self.logger = logging.getLogger(__name__)
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf/ims/')
- self.data_dir = CONST.dir_ims_data
- self.test_dir = CONST.dir_repo_vims_test
+ self.case_dir = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ 'vnf/ims/')
+ self.data_dir = CONST.__getattribute__('dir_ims_data')
+ self.test_dir = CONST.__getattribute__('dir_repo_vims_test')
self.ob_projectid = ""
self.keystone_client = os_utils.get_keystone_client()
self.ob_nsr_id = ""
@@ -118,7 +108,7 @@ class ImsVnf(vnf.VnfOnBoarding):
self.userdata_file = get_config("openbaton.userdata.file",
config_file)
- def deploy_orchestrator(self, **kwargs):
+ def deploy_orchestrator(self):
self.logger.info("Additional pre-configuration steps")
nova_client = os_utils.get_nova_client()
neutron_client = os_utils.get_neutron_client()
@@ -129,25 +119,28 @@ class ImsVnf(vnf.VnfOnBoarding):
self.logger.info("Upload some OS images if it doesn't exist")
temp_dir = os.path.join(self.data_dir, "tmp/")
for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s" % (image_name, image_url))
+ self.logger.info("image: %s, url: %s", image_name, image_url)
try:
image_id = os_utils.get_image_id(glance_client,
image_name)
- self.logger.info("image_id: %s" % image_id)
+ self.logger.info("image_id: %s", image_id)
except BaseException:
- self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
+ self.logger.error("Unexpected error: %s", sys.exc_info()[0])
if image_id == '':
- self.logger.info("""%s image doesn't exist on glance repository. Try
- downloading this image and upload on glance !""" % image_name)
- image_id = download_and_add_image_on_glance(glance_client,
- image_name,
- image_url,
- temp_dir)
+ self.logger.info("""%s image doesn't exist on glance
+ repository. Try downloading this image
+ and upload on glance !""" % image_name)
+ image_id = os_utils.download_and_add_image_on_glance(
+ glance_client,
+ image_name,
+ image_url,
+ temp_dir)
if image_id == '':
- self.step_failure(
- "Failed to find or upload required OS "
- "image for this deployment")
+ self.logger.error("Failed to find or upload required OS "
+ "image for this deployment")
+ return False
+
network_dic = os_utils.create_network_full(neutron_client,
"openbaton_mgmt",
"openbaton_mgmt_subnet",
@@ -177,6 +170,7 @@ class ImsVnf(vnf.VnfOnBoarding):
if floatip is None:
self.logger.error("Cannot create floating IP.")
+ return False
userdata = "#!/bin/bash\n"
userdata += "echo \"Executing userdata...\"\n"
@@ -249,8 +243,10 @@ class ImsVnf(vnf.VnfOnBoarding):
self.logger.info("flavor: m1.medium\n"
"image: %s\n"
"network_id: %s\n"
- "userdata: %s\n"
- % (self.imagename, network_id, userdata))
+ "userdata: %s\n",
+ self.imagename,
+ network_id,
+ userdata)
instance = os_utils.create_instance_and_wait_for_active(
"orchestra",
@@ -266,10 +262,12 @@ class ImsVnf(vnf.VnfOnBoarding):
os_utils.add_secgroup_to_instance(nova_client,
self.ob_instance_id, sg_id)
- self.logger.info("Associating floating ip: '%s' to VM '%s' "
- % (floatip, "orchestra-openbaton"))
+ self.logger.info("Associating floating ip: '%s' to VM '%s' ",
+ floatip,
+ "orchestra-openbaton")
if not os_utils.add_floating_ip(nova_client, instance.id, floatip):
- self.step_failure("Cannot associate floating IP to VM.")
+ self.logger.error("Cannot associate floating IP to VM.")
+ return False
self.logger.info("Waiting for Open Baton NFVO to be up and running...")
x = 0
@@ -284,7 +282,7 @@ class ImsVnf(vnf.VnfOnBoarding):
x += 1
if x == 100:
- self.step_failure("Open Baton is not started correctly")
+ self.logger.error("Open Baton is not started correctly")
self.ob_ip = floatip
self.ob_password = "openbaton"
@@ -296,6 +294,7 @@ class ImsVnf(vnf.VnfOnBoarding):
self.details["orchestrator"] = {
'status': "PASS", 'result': "Deploy Open Baton NFVO: OK"}
self.logger.info("Deploy Open Baton NFVO: OK")
+ return True
def deploy_vnf(self):
self.logger.info("Starting vIMS Deployment...")
@@ -315,22 +314,22 @@ class ImsVnf(vnf.VnfOnBoarding):
'20',
'1',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
self.logger.info("Getting project 'default'...")
project_agent = self.main_agent.get_agent("project", self.ob_projectid)
for p in json.loads(project_agent.find()):
if p.get("name") == "default":
self.ob_projectid = p.get("id")
- self.logger.info("Found project 'default': %s" % p)
+ self.logger.info("Found project 'default': %s", p)
break
- self.logger.debug("project id: %s" % self.ob_projectid)
+ self.logger.debug("project id: %s", self.ob_projectid)
if self.ob_projectid == "":
- self.step_failure("Default project id was not found!")
+ self.logger.error("Default project id was not found!")
creds = os_utils.get_credentials()
- self.logger.info("PoP creds: %s" % creds)
+ self.logger.info("PoP creds: %s", creds)
if os_utils.is_keystone_v3():
self.logger.info(
@@ -343,7 +342,7 @@ class ImsVnf(vnf.VnfOnBoarding):
"Using v2 API of OpenStack... -> Using OS_TENANT_NAME")
project_id = creds.get("tenant_name")
- self.logger.debug("project id: %s" % project_id)
+ self.logger.debug("project id: %s", project_id)
vim_json = {
"name": "vim-instance",
@@ -363,7 +362,7 @@ class ImsVnf(vnf.VnfOnBoarding):
}
}
- self.logger.debug("Registering VIM: %s" % vim_json)
+ self.logger.debug("Registering VIM: %s", vim_json)
self.main_agent.get_agent(
"vim",
@@ -374,28 +373,29 @@ class ImsVnf(vnf.VnfOnBoarding):
nsd = {}
try:
- self.logger.info("sending: %s" % self.market_link)
+ self.logger.info("sending: %s", self.market_link)
nsd = market_agent.create(entity=self.market_link)
self.logger.info("Onboarded NSD: " + nsd.get("name"))
except NfvoException as e:
- self.step_failure(e.message)
+ self.logger.error(e.message)
nsr_agent = self.main_agent.get_agent("nsr",
project_id=self.ob_projectid)
nsd_id = nsd.get('id')
if nsd_id is None:
- self.step_failure("NSD not onboarded correctly")
+ self.logger.error("NSD not onboarded correctly")
try:
self.nsr = nsr_agent.create(nsd_id)
except NfvoException as e:
- self.step_failure(e.message)
+ self.logger.error(e.message)
if self.nsr.get('code') is not None:
self.logger.error(
- "vIMS cannot be deployed: %s -> %s" %
- (self.nsr.get('code'), self.nsr.get('message')))
- self.step_failure("vIMS cannot be deployed")
+ "vIMS cannot be deployed: %s -> %s",
+ self.nsr.get('code'),
+ self.nsr.get('message'))
+ self.logger.error("vIMS cannot be deployed")
i = 0
self.logger.info("Waiting for NSR to go to ACTIVE...")
@@ -403,8 +403,8 @@ class ImsVnf(vnf.VnfOnBoarding):
"status") != 'ERROR':
i += 1
if i == 150:
- self.step_failure("After %s sec the NSR did not go to ACTIVE.."
- % 5 * i)
+ self.logger.error("INACTIVE NSR after %s sec..", 5 * i)
+
time.sleep(5)
self.nsr = json.loads(nsr_agent.find(self.nsr.get('id')))
@@ -414,60 +414,66 @@ class ImsVnf(vnf.VnfOnBoarding):
else:
self.details["vnf"] = {'status': "FAIL", 'result': self.nsr}
self.logger.error(self.nsr)
- self.step_failure("Deploy VNF: ERROR")
+ self.logger.error("Deploy VNF: ERROR")
+ return False
+
self.ob_nsr_id = self.nsr.get("id")
self.logger.info(
"Sleep for 60s to ensure that all services are up and running...")
time.sleep(60)
- return self.details.get("vnf")
+ return True
def test_vnf(self):
# Adaptations probably needed
# code used for cloudify_ims
# ruby client on jumphost calling the vIMS on the SUT
self.logger.info(
- "Testing if %s works properly..." %
- self.nsr.get('name'))
+ "Testing if %s works properly...", self.nsr.get('name'))
for vnfr in self.nsr.get('vnfr'):
self.logger.info(
- "Checking ports %s of VNF %s" %
- (self.ims_conf.get(
- vnfr.get('name')).get('ports'),
- vnfr.get('name')))
+ "Checking ports %s of VNF %s",
+ self.ims_conf.get(vnfr.get('name')).get('ports'),
+ vnfr.get('name'))
for vdu in vnfr.get('vdu'):
for vnfci in vdu.get('vnfc_instance'):
self.logger.debug(
- "Checking ports of VNFC instance %s" %
+ "Checking ports of VNFC instance %s",
vnfci.get('hostname'))
for floatingIp in vnfci.get('floatingIps'):
self.logger.debug(
- "Testing %s:%s" %
- (vnfci.get('hostname'), floatingIp.get('ip')))
+ "Testing %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'))
for port in self.ims_conf.get(
vnfr.get('name')).get('ports'):
if servertest(floatingIp.get('ip'), port):
self.logger.info(
- "VNFC instance %s is reachable at %s:%s" %
- (vnfci.get('hostname'),
- floatingIp.get('ip'),
- port))
+ "VNFC instance %s is reachable at %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'),
+ port)
else:
self.logger.error(
"VNFC instance %s is not reachable "
- "at %s:%s" % (vnfci.get('hostname'),
- floatingIp.get('ip'), port))
+ "at %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'),
+ port)
self.details["test_vnf"] = {
'status': "FAIL", 'result': (
"Port %s of server %s -> %s is "
- "not reachable" %
- (port, vnfci.get('hostname'),
- floatingIp.get('ip')))}
- self.step_failure("Test VNF: ERROR")
+ "not reachable",
+ port,
+ vnfci.get('hostname'),
+ floatingIp.get('ip'))}
+ self.logger.error("Test VNF: ERROR")
+ return False
+
self.details["test_vnf"] = {
'status': "PASS",
'result': "All tests have been executed successfully"}
self.logger.info("Test VNF: OK")
- return self.details.get('test_vnf')
+ return True
def clean(self):
self.main_agent.get_agent(
@@ -476,28 +482,6 @@ class ImsVnf(vnf.VnfOnBoarding):
time.sleep(5)
os_utils.delete_instance(nova_client=os_utils.get_nova_client(),
instance_id=self.ob_instance_id)
- # TODO question is the clean removing also the VM?
+ # question is the clean removing also the VM?
# I think so since is goinf to remove the tenant...
super(ImsVnf, self).clean()
-
- def main(self, **kwargs):
- self.logger.info("Orchestra IMS VNF onboarding test starting")
- self.execute()
- self.logger.info("Orchestra IMS VNF onboarding test executed")
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- logging.basicConfig()
- test = ImsVnf()
- test.deploy_orchestrator()
- test.deploy_vnf()
- test.test_vnf()
- test.clean()
diff --git a/functest/tests/unit/cli/commands/test_cli_env.py b/functest/tests/unit/cli/commands/test_cli_env.py
index 14e926eb9..b4e987871 100644
--- a/functest/tests/unit/cli/commands/test_cli_env.py
+++ b/functest/tests/unit/cli/commands/test_cli_env.py
@@ -8,7 +8,6 @@
import logging
import unittest
-from git.exc import NoSuchPathError
import mock
from functest.cli.commands import cli_env
@@ -72,42 +71,29 @@ class CliEnvTesting(unittest.TestCase):
mock_click_echo.assert_called_with(test_utils.
RegexMatch(reg_string))
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_installer_type(self, *args):
self._test_show_missing_env_var('INSTALLER_TYPE', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_installer_ip(self, *args):
self._test_show_missing_env_var('INSTALLER_IP', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_scenario(self, *args):
self._test_show_missing_env_var('SCENARIO', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_node(self, *args):
self._test_show_missing_env_var('NODE', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_build_tag(self, *args):
self._test_show_missing_env_var('BUILD_TAG', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
def test_show_missing_ci_debug(self, *args):
self._test_show_missing_env_var('DEBUG', *args)
- @mock.patch('functest.cli.commands.cli_env.git.Repo')
@mock.patch('functest.cli.commands.cli_env.os.path.isfile',
return_value=False)
def test_show_missing_environment(self, *args):
self._test_show_missing_env_var('STATUS', *args)
- @mock.patch('functest.cli.commands.cli_env.os.path.exists',
- return_value=False)
- def test_show_missing_git_repo_dir(self, *args):
- CONST.__setattr__('dir_repo_functest', None)
- self.assertRaises(NoSuchPathError, lambda: self.cli_environ.show())
-
@mock.patch('functest.cli.commands.cli_env.click.echo')
@mock.patch('functest.cli.commands.cli_env.os.path.isfile',
return_value=True)
diff --git a/functest/tests/unit/core/test_unit.py b/functest/tests/unit/core/test_unit.py
index 79c4e7d7a..ca73de672 100644
--- a/functest/tests/unit/core/test_unit.py
+++ b/functest/tests/unit/core/test_unit.py
@@ -41,7 +41,9 @@ class PyTestSuiteRunnerTesting(unittest.TestCase):
self._test_run(result=mock_result,
status=testcase.TestCase.EX_RUN_ERROR)
self.assertEqual(self.psrunner.result, 0)
- self.assertEqual(self.psrunner.details, {'errors': [], 'failures': []})
+ self.assertEqual(self.psrunner.details,
+ {'errors': 0, 'failures': 0, 'stream': '',
+ 'testsRun': 0})
self.assertEqual(self.psrunner.is_successful(),
testcase.TestCase.EX_TESTCASE_FAILED)
@@ -52,8 +54,8 @@ class PyTestSuiteRunnerTesting(unittest.TestCase):
self._test_run(result=mock_result)
self.assertEqual(self.psrunner.result, 96)
self.assertEqual(self.psrunner.details,
- {'errors': [('test1', 'error_msg1')],
- 'failures': [('test2', 'failure_msg1')]})
+ {'errors': 1, 'failures': 1, 'stream': '',
+ 'testsRun': 50})
self.assertEqual(self.psrunner.is_successful(),
testcase.TestCase.EX_TESTCASE_FAILED)
@@ -62,7 +64,9 @@ class PyTestSuiteRunnerTesting(unittest.TestCase):
failures=[])
self._test_run(result=mock_result)
self.assertEqual(self.psrunner.result, 100)
- self.assertEqual(self.psrunner.details, {'errors': [], 'failures': []})
+ self.assertEqual(self.psrunner.details,
+ {'errors': 0, 'failures': 0, 'stream': '',
+ 'testsRun': 50})
self.assertEqual(self.psrunner.is_successful(),
testcase.TestCase.EX_OK)
diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py
index ce8590400..f061c4096 100644
--- a/functest/tests/unit/core/test_vnf.py
+++ b/functest/tests/unit/core/test_vnf.py
@@ -10,156 +10,199 @@
# pylint: disable=missing-docstring
import logging
-import os
import unittest
import mock
from functest.core import vnf
from functest.core import testcase
+from functest.utils import constants
class VnfBaseTesting(unittest.TestCase):
+ """The class testing VNF."""
+ # pylint: disable=missing-docstring,too-many-public-methods
+
+ tenant_name = 'test_tenant_name'
+ tenant_description = 'description'
def setUp(self):
- self.test = vnf.VnfOnBoarding(
- project='functest', case_name='aaa')
- self.test.project = "functest"
- self.test.start_time = "1"
- self.test.stop_time = "5"
- self.test.result = ""
- self.test.details = {
- "orchestrator": {"status": "PASS", "result": "", "duration": 20},
- "vnf": {"status": "PASS", "result": "", "duration": 15},
- "test_vnf": {"status": "FAIL", "result": "", "duration": 5}}
- self.test.keystone_client = 'test_client'
- self.test.tenant_name = 'test_tenant_name'
-
- def test_execute_deploy_vnf_fail(self):
+ constants.CONST.__setattr__("vnf_foo_tenant_name", self.tenant_name)
+ constants.CONST.__setattr__(
+ "vnf_foo_tenant_description", self.tenant_description)
+ self.test = vnf.VnfOnBoarding(project='functest', case_name='foo')
+
+ def test_run_deploy_vnf_exc(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
return_value=None), \
mock.patch.object(self.test, 'deploy_vnf',
side_effect=Exception):
- self.assertEqual(self.test.execute(),
+ self.assertEqual(self.test.run(),
testcase.TestCase.EX_TESTCASE_FAILED)
- def test_execute_test_vnf_fail(self):
+ def test_run_test_vnf_exc(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
return_value=None), \
mock.patch.object(self.test, 'deploy_vnf'), \
mock.patch.object(self.test, 'test_vnf',
side_effect=Exception):
- self.assertEqual(self.test.execute(),
+ self.assertEqual(self.test.run(),
testcase.TestCase.EX_TESTCASE_FAILED)
- @mock.patch('functest.core.vnf.os_utils.get_tenant_id',
- return_value='test_tenant_id')
- @mock.patch('functest.core.vnf.os_utils.delete_tenant',
- return_value=True)
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_user_id')
- @mock.patch('functest.core.vnf.os_utils.delete_user',
- return_value=True)
- def test_execute_default(self, *args):
+ def test_run_deploy_orch_ko(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=None), \
- mock.patch.object(self.test, 'deploy_vnf'), \
- mock.patch.object(self.test, 'test_vnf'), \
- mock.patch.object(self.test, 'parse_results',
- return_value='ret_exit_code'), \
- mock.patch.object(self.test, 'log_results'):
- self.assertEqual(self.test.execute(),
- 'ret_exit_code')
-
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ return_value=False), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_deploy_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=False), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_test_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=False):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_default(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(), testcase.TestCase.EX_OK)
+
+ def test_deploy_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfDeploymentException):
+ self.test.deploy_vnf()
+
+ def test_test_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfTestException):
+ self.test.test_vnf()
+
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id', return_value='')
- def test_prepare_missing_userid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_clean_user_set(self, *args):
+ self.test.user_created = True
+ self.test.clean()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='')
- def test_prepare_missing_tenantid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_clean_user_unset(self, *args):
+ self.test.user_created = False
+ self.test.clean()
+ args[0].assert_not_called()
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='')
- def test_prepare_missing_roleid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ def test_clean_tenant_set(self, *args):
+ self.test.tenant_created = True
+ self.test.clean()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.add_role_user',
- return_value='')
- def test_prepare_role_add_failure(self, *args):
- with self.assertRaises(Exception):
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ def test_clean_tenant_unset(self, *args):
+ self.test.tenant_created = False
+ self.test.clean()
+ args[0].assert_not_called()
+ args[1].assert_called_once_with()
+
+ def test_deploy_orch_unimplemented(self):
+ self.assertTrue(self.test.deploy_orchestrator())
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value={'creds': 'test'})
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client',
+ return_value='test')
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ return_value=0)
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf',
+ return_value=0)
+ def test_prepare(self, *args):
+ self.assertEqual(self.test.prepare(),
+ testcase.TestCase.EX_OK)
+ args[0].assert_called_once_with('test', self.tenant_name)
+ args[1].assert_called_once_with(
+ 'test', self.tenant_name, self.tenant_description)
+ args[2].assert_called_once_with()
+ args[3].assert_called_once_with()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ side_effect=Exception)
+ def test_prepare_admin_creds_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
self.test.prepare()
+ args[0].assert_called_once_with()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client',
+ side_effect=Exception)
+ def test_prepare_keystone_client_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
+ self.test.prepare()
+ args[0].assert_called_once_with()
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.add_role_user')
- @mock.patch('functest.core.vnf.os_utils.create_user',
- return_value='')
- def test_create_user_failure(self, *args):
- with self.assertRaises(Exception):
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ side_effect=Exception)
+ def test_prepare_tenant_creation_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
self.test.prepare()
+ args[0].assert_called_once_with(
+ mock.ANY, self.tenant_name, self.tenant_description)
+ args[1].assert_called_once_with()
+ args[2].assert_called_once_with()
- def test_log_results_default(self):
- with mock.patch('functest.core.vnf.'
- 'ft_utils.logger_test_results') \
- as mock_method:
- self.test.log_results()
- self.assertTrue(mock_method.called)
-
- def test_step_failures_default(self):
- with self.assertRaises(Exception):
- self.test.step_failure("error_msg")
-
- def test_deploy_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.deploy_vnf()
- self.assertIn('VNF not deployed', str(context.exception))
-
- def test_test_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.test_vnf()()
- self.assertIn('VNF not tested', str(context.exception))
-
- def test_parse_results_ex_ok(self):
- self.test.details['test_vnf']['status'] = 'PASS'
- self.assertEqual(self.test.parse_results(), os.EX_OK)
-
- def test_parse_results_ex_run_error(self):
- self.test.details['vnf']['status'] = 'FAIL'
- self.assertEqual(self.test.parse_results(), os.EX_SOFTWARE)
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ return_value=0)
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf',
+ side_effect=Exception)
+ def test_prepare_user_creation_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
+ self.test.prepare()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with(
+ mock.ANY, self.tenant_name, self.tenant_description)
+ args[2].assert_called_once_with()
+ args[3].assert_called_once_with()
if __name__ == "__main__":
diff --git a/functest/tests/unit/utils/test_functest_utils.py b/functest/tests/unit/utils/test_functest_utils.py
index d84a32013..fbfa420aa 100644
--- a/functest/tests/unit/utils/test_functest_utils.py
+++ b/functest/tests/unit/utils/test_functest_utils.py
@@ -8,11 +8,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
import logging
+import pkg_resources
import os
import time
import unittest
-from git.exc import NoSuchPathError
import mock
import requests
from six.moves import urllib
@@ -56,8 +56,8 @@ class FunctestUtilsTesting(unittest.TestCase):
self.testcase_dict = {'case_name': 'testname',
'criteria': self.criteria}
self.parameter = 'general.openstack.image_name'
- self.config_yaml = os.path.normpath(os.path.join(os.path.dirname(
- os.path.abspath(__file__)), '../../../ci/config_functest.yaml'))
+ self.config_yaml = pkg_resources.resource_filename(
+ 'functest', 'ci/config_functest.yaml')
self.db_url_env = 'http://foo/testdb'
self.testcases_yaml = "test_testcases_yaml"
self.file_yaml = {'general': {'openstack': {'image_name':
@@ -97,27 +97,6 @@ class FunctestUtilsTesting(unittest.TestCase):
m.assert_called_once_with(dest, 'wb')
self.assertTrue(mock_sh.called)
- def test_get_git_branch(self):
- with mock.patch('functest.utils.functest_utils.Repo') as mock_repo:
- mock_obj2 = mock.Mock()
- attrs = {'name': 'test_branch'}
- mock_obj2.configure_mock(**attrs)
-
- mock_obj = mock.Mock()
- attrs = {'active_branch': mock_obj2}
- mock_obj.configure_mock(**attrs)
-
- mock_repo.return_value = mock_obj
- self.assertEqual(functest_utils.get_git_branch(self.repo_path),
- 'test_branch')
-
- @mock.patch('functest.utils.functest_utils.Repo',
- side_effect=NoSuchPathError)
- def test_get_git_branch_failed(self, mock_repo):
- self.assertRaises(NoSuchPathError,
- lambda: functest_utils.get_git_branch(self.repo_path
- ))
-
@mock.patch('functest.utils.functest_utils.logger.error')
def test_get_installer_type_failed(self, mock_logger_error):
with mock.patch.dict(os.environ,
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py
index 15b540577..0f06b1e17 100644
--- a/functest/tests/unit/utils/test_openstack_utils.py
+++ b/functest/tests/unit/utils/test_openstack_utils.py
@@ -1835,6 +1835,77 @@ class OSUtilsTesting(unittest.TestCase):
None)
self.assertTrue(mock_logger_error.called)
+ def test_get_or_create_user_for_vnf_get(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value='user_id'), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value='tenant_id'):
+ self.assertFalse(openstack_utils.
+ get_or_create_user_for_vnf(self.keystone_client,
+ 'my_vnf'))
+
+ def test_get_or_create_user_for_vnf_create(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value=None), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value='tenant_id'):
+ self.assertTrue(openstack_utils.
+ get_or_create_user_for_vnf(self.keystone_client,
+ 'my_vnf'))
+
+ def test_get_or_create_user_for_vnf_error_get_user_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ side_effect=Exception):
+ self.assertRaises(Exception)
+
+ def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value='user_id'), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ side_effect='Exception'):
+ self.assertRaises(Exception)
+
+ def test_get_or_create_tenant_for_vnf_get(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_tenant_id',
+ return_value='tenant_id'):
+ self.assertFalse(
+ openstack_utils.get_or_create_tenant_for_vnf(
+ self.keystone_client, 'tenant_name', 'tenant_description'))
+
+ def test_get_or_create_tenant_for_vnf_create(self):
+ with mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value=None):
+ self.assertTrue(
+ openstack_utils.get_or_create_tenant_for_vnf(
+ self.keystone_client, 'tenant_name', 'tenant_description'))
+
+ def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_tenant_id',
+ side_effect=Exception):
+ self.assertRaises(Exception)
+
+ def test_download_and_add_image_on_glance_image_creation_failure(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'os.makedirs'), \
+ mock.patch('functest.utils.openstack_utils.'
+ 'ft_utils.download_url',
+ return_value=True), \
+ mock.patch('functest.utils.openstack_utils.'
+ 'create_glance_image',
+ return_value=''):
+ resp = openstack_utils.download_and_add_image_on_glance(
+ self.glance_client,
+ 'image_name',
+ 'http://url',
+ 'data_dir')
+ self.assertEqual(resp, False)
+
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
diff --git a/functest/tests/unit/vnf/ims/test_cloudify_ims.py b/functest/tests/unit/vnf/ims/test_cloudify_ims.py
index c3c04e1d9..2156a122f 100644
--- a/functest/tests/unit/vnf/ims/test_cloudify_ims.py
+++ b/functest/tests/unit/vnf/ims/test_cloudify_ims.py
@@ -69,7 +69,7 @@ class CloudifyImsTesting(unittest.TestCase):
mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
'os_utils.get_image_id',
return_value=''), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
+ mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.os_utils.'
'download_and_add_image_on_glance') as m, \
self.assertRaises(Exception) as context:
self.ims_vnf.deploy_orchestrator()
@@ -458,35 +458,6 @@ class CloudifyImsTesting(unittest.TestCase):
{'status': 'PASS',
'result': 'vims_test_result'})
- def test_download_and_add_image_on_glance_incorrect_url(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.makedirs'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.download_url',
- return_value=False):
- resp = cloudify_ims.download_and_add_image_on_glance(self.
- glance_client,
- 'image_name',
- 'http://url',
- 'data_dir')
- self.assertEqual(resp, False)
-
- def test_download_and_add_image_on_glance_image_creation_failure(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.makedirs'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.download_url',
- return_value=True), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os_utils.create_glance_image',
- return_value=''):
- resp = cloudify_ims.download_and_add_image_on_glance(self.
- glance_client,
- 'image_name',
- 'http://url',
- 'data_dir')
- self.assertEqual(resp, False)
-
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
diff --git a/functest/tests/unit/vnf/ims/test_ims_base.py b/functest/tests/unit/vnf/ims/test_ims_base.py
index db5b18d71..66d35e39f 100644
--- a/functest/tests/unit/vnf/ims/test_ims_base.py
+++ b/functest/tests/unit/vnf/ims/test_ims_base.py
@@ -35,23 +35,6 @@ class ClearwaterOnBoardingBaseTesting(unittest.TestCase):
'cookies': ""}
self.mock_post_200.configure_mock(**attrs)
- def test_create_ellis_number_failure(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.'
- 'clearwater_ims_base.requests.post',
- return_value=self.mock_post_500), \
- self.assertRaises(Exception) as context:
- self.ims_vnf.create_ellis_number()
-
- msg = "Unable to create a number:"
- self.assertTrue(msg in context.exception)
-
- def _get_post_status(self, url, cookies='', data=''):
- ellis_url = "http://test_ellis_ip/session"
- if url == ellis_url:
- return self.mock_post_200
- return self.mock_post
-
-
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
diff --git a/functest/utils/env.py b/functest/utils/env.py
index 3724ec998..0174588d7 100644
--- a/functest/utils/env.py
+++ b/functest/utils/env.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+import pkg_resources
import os
import re
@@ -16,8 +17,8 @@ default_envs = {
'BUILD_TAG': None,
'OS_ENDPOINT_TYPE': None,
'OS_AUTH_URL': None,
- 'CONFIG_FUNCTEST_YAML': os.path.normpath(os.path.join(os.path.dirname(
- os.path.abspath(__file__)), '../ci/config_functest.yaml'))
+ 'CONFIG_FUNCTEST_YAML': pkg_resources.resource_filename(
+ 'functest', 'ci/config_functest.yaml')
}
diff --git a/functest/utils/functest_utils.py b/functest/utils/functest_utils.py
index 995d94ff0..d8cfabce0 100644
--- a/functest/utils/functest_utils.py
+++ b/functest/utils/functest_utils.py
@@ -22,7 +22,6 @@ import dns.resolver
import requests
from six.moves import urllib
import yaml
-from git import Repo
from functest.utils import constants
from functest.utils import decorators
@@ -68,15 +67,6 @@ def download_url(url, dest_path):
# CI UTILS
#
# -----------------------------------------------------------
-def get_git_branch(repo_path):
- """
- Get git branch name
- """
- repo = Repo(repo_path)
- branch = repo.active_branch
- return branch.name
-
-
def get_installer_type():
"""
Get installer type (fuel, apex, joid, compass)
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index 57a2aa2be..f155449d5 100644
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -279,6 +279,22 @@ def get_heat_client(other_creds={}):
return heatclient.Client(get_heat_client_version(), session=sess)
+def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
+ dest_path = data_dir
+ if not os.path.exists(dest_path):
+ os.makedirs(dest_path)
+ file_name = image_url.rsplit('/')[-1]
+ if not ft_utils.download_url(image_url, dest_path):
+ return False
+
+ image = create_glance_image(
+ glance, image_name, dest_path + file_name)
+ if not image:
+ return False
+
+ return image
+
+
# *********************************************
# NOVA
# *********************************************
@@ -1412,6 +1428,32 @@ def get_or_create_tenant(keystone_client, tenant_name, tenant_description):
return tenant_id
+def get_or_create_tenant_for_vnf(keystone_client, tenant_name,
+ tenant_description):
+ """Get or Create a Tenant
+
+ Args:
+ keystone_client: keystone client reference
+ tenant_name: the name of the tenant
+ tenant_description: the description of the tenant
+
+ return False if tenant retrieved though get
+ return True if tenant created
+ raise Exception if error during processing
+ """
+ try:
+ tenant_id = get_tenant_id(keystone_client, tenant_name)
+ if not tenant_id:
+ tenant_id = create_tenant(keystone_client, tenant_name,
+ tenant_description)
+ return True
+ else:
+ return False
+ except:
+ raise Exception("Impossible to create a Tenant for the VNF {}".format(
+ tenant_name))
+
+
def create_user(keystone_client, user_name, user_password,
user_email, tenant_id):
try:
@@ -1444,6 +1486,32 @@ def get_or_create_user(keystone_client, user_name, user_password,
return user_id
+def get_or_create_user_for_vnf(keystone_client, vnf_ref):
+ """Get or Create user for VNF
+
+ Args:
+ keystone_client: keystone client reference
+ vnf_ref: VNF reference used as user name & password, tenant name
+
+ return False if user retrieved through get
+ return True if user created
+ raise Exception if error during processing
+ """
+ try:
+ user_id = get_user_id(keystone_client, vnf_ref)
+ tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ if not user_id:
+ user_id = create_user(keystone_client, vnf_ref, vnf_ref,
+ "", tenant_id)
+ return True
+ else:
+ return False
+ add_role_user(keystone_client, user_id, 'admin', vnf_ref)
+ except:
+ raise Exception("Impossible to create a user for the VNF {}".format(
+ vnf_ref))
+
+
def add_role_user(keystone_client, user_id, role_id, tenant_id):
try:
if is_keystone_v3():
diff --git a/requirements.txt b/requirements.txt
index 787aeaf4a..10d83e18e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,42 +1,30 @@
-#
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-pyyaml==3.10
-gitpython==1.0.1
-python-openstackclient==2.3.0
-python-ceilometerclient==2.6.2
-python-heatclient==1.7.0
-python-keystoneclient==3.5.0
-python-neutronclient==6.0.0
-python-novaclient==6.0.0
-python-congressclient==1.5.0
-python-tackerclient==0.7.0
-pexpect==4.0
-requests==2.9.1
+pbr>=1.8 # Apache-2.0
+PyYAML>=3.10.0 # MIT
+GitPython>=1.0.1 # BSD License (3 clause)
+keystoneauth1>=2.18.0 # Apache-2.0
+python-cinderclient!=1.7.0,!=1.7.1,>=1.6.0 # Apache-2.0
+python-glanceclient>=2.5.0 # Apache-2.0
+python-heatclient>=1.6.1 # Apache-2.0
+python-keystoneclient>=3.8.0 # Apache-2.0
+python-neutronclient>=5.1.0 # Apache-2.0
+python-novaclient!=7.0.0,>=6.0.0 # Apache-2.0
+python-tackerclient>=0.8.0 # Apache-2.0
+pexpect!=3.3,>=3.1 # ISC License
+requests!=2.12.2,>=2.10.0 # Apache-2.0
robotframework==3.0.2
robotframework-httplibrary==0.4.2
robotframework-requests==0.4.7
-robotframework-sshlibrary==2.1.3; python_version=='2.7'
-jmespath==0.9.2
-configObj==5.0.6
-Flask==0.10.1
-xmltodict==0.9.2
+robotframework-sshlibrary==2.1.3;python_version=='2.7'
scp==0.10.2
-paramiko==2.1.2
-subprocess32; python_version=='2.7'
-shyaml
-dnspython
-Pillow==3.3.0
+subprocess32;python_version=='2.7'
+dnspython>=1.14.0;python_version=='2.7' # http://www.dnspython.org/LICENSE
+dnspython3!=1.13.0,!=1.14.0,>=1.12.0;python_version>='3.0' # http://www.dnspython.org/LICENSE
click==6.6
openbaton-cli==2.2.1-beta7
-mock==1.3.0
+mock>=2.0 # BSD
iniparse==0.4
-PrettyTable>=0.7.1,<0.8 # BSD
+PrettyTable<0.8,>=0.7.1 # BSD
six>=1.9.0 # MIT
opnfv
-baro_tests
+baro-tests
snaps
diff --git a/setup.py b/setup.py
index 460651cfd..a52d90555 100644
--- a/setup.py
+++ b/setup.py
@@ -9,9 +9,16 @@
# pylint: disable=missing-docstring
-from setuptools import setup
+import setuptools
-setup(
- setup_requires=['pbr>=1.9', 'setuptools>=17.1'],
- pbr=True,
-)
+# In python < 2.7.4, a lazy loading of package `pbr` will break
+# setuptools if some other modules registered functions in `atexit`.
+# solution from: http://bugs.python.org/issue15881#msg170215
+try:
+ import multiprocessing # noqa
+except ImportError:
+ pass
+
+setuptools.setup(
+ setup_requires=['pbr>=1.8'],
+ pbr=True)
diff --git a/test-requirements.txt b/test-requirements.txt
index 9fe4bc749..f22863c7d 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,13 +1,6 @@
-#
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-coverage==4.1
-mock==1.3.0
-nose==1.3.7
-flake8>=2.5.4,<2.6.0 # MIT
+coverage>=4.0 # Apache-2.0
+mock>=2.0 # BSD
+nose # LGPL
+flake8<2.6.0,>=2.5.4 # MIT
pylint==1.4.5 # GPLv2
-sphinx!=1.6.1,>=1.5.1 # BSD
+sphinx!=1.3b1,<1.4,>=1.2.1 # BSD
diff --git a/tox.ini b/tox.ini
index 0b3e7782f..c8d45457c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -29,13 +29,9 @@ commands = flake8
basepython = python2.7
whitelist_externals = bash
modules =
- functest.core.feature
- functest.core.testcase
- functest.core.unit
+ functest.core
functest.opnfv_tests.sdn.odl
- functest.tests.unit.core.test_feature
- functest.tests.unit.core.test_testcase
- functest.tests.unit.core.test_unit
+ functest.tests.unit.core
functest.tests.unit.odl
functest.tests.unit.utils.test_decorators
functest.utils.decorators