aboutsummaryrefslogtreecommitdiffstats
path: root/functest
diff options
context:
space:
mode:
Diffstat (limited to 'functest')
-rwxr-xr-xfunctest/ci/config_functest.yaml1
-rwxr-xr-xfunctest/ci/testcases.yaml88
-rw-r--r--functest/core/feature.py (renamed from functest/core/feature_base.py)4
-rw-r--r--functest/core/vnf_base.py26
-rw-r--r--functest/opnfv_tests/features/barometer.py4
-rw-r--r--functest/opnfv_tests/features/copper.py4
-rw-r--r--functest/opnfv_tests/features/doctor.py4
-rw-r--r--functest/opnfv_tests/features/domino.py4
-rw-r--r--functest/opnfv_tests/features/netready.py4
-rw-r--r--functest/opnfv_tests/features/odl_sfc.py10
-rw-r--r--functest/opnfv_tests/features/promise.py4
-rw-r--r--functest/opnfv_tests/features/sdnvpn.py4
-rw-r--r--functest/opnfv_tests/features/security_scan.py4
-rw-r--r--functest/opnfv_tests/mano/orchestra.py4
-rwxr-xr-xfunctest/opnfv_tests/openstack/refstack_client/refstack_client.py51
-rwxr-xr-xfunctest/opnfv_tests/openstack/refstack_client/tempest_conf.py53
-rw-r--r--functest/opnfv_tests/openstack/tempest/custom_tests/blacklist.txt32
-rwxr-xr-xfunctest/opnfv_tests/sdn/odl/odl.py69
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater_ims_base.py148
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py142
-rw-r--r--[-rwxr-xr-x]functest/opnfv_tests/vnf/ims/opera_ims.py457
-rw-r--r--functest/opnfv_tests/vnf/rnc/parser.py4
-rw-r--r--functest/opnfv_tests/vnf/router/vyos_vrouter.py4
-rw-r--r--functest/tests/unit/ci/test_prepare_env.py111
-rw-r--r--functest/tests/unit/ci/test_run_tests.py84
-rw-r--r--functest/tests/unit/ci/test_tier_handler.py9
-rw-r--r--functest/tests/unit/core/test_testcase.py (renamed from functest/tests/unit/core/test_testcase_base.py)7
-rw-r--r--functest/tests/unit/odl/test_odl.py371
-rw-r--r--functest/tests/unit/opnfv_tests/openstack/tempest/test_conf_utils.py222
-rw-r--r--functest/tests/unit/opnfv_tests/openstack/tempest/test_tempest.py51
-rw-r--r--functest/tests/unit/opnfv_tests/vnf/ims/test_clearwater.py34
-rw-r--r--functest/tests/unit/opnfv_tests/vnf/ims/test_cloudify_ims.py118
-rw-r--r--functest/tests/unit/opnfv_tests/vnf/ims/test_ims_base.by58
-rw-r--r--functest/tests/unit/opnfv_tests/vnf/ims/test_orchestrator_cloudify.py55
-rw-r--r--functest/tests/unit/utils/test_functest_logger.py48
-rw-r--r--functest/tests/unit/utils/test_openstack_tacker.py92
-rw-r--r--functest/tests/unit/utils/test_openstack_utils.py55
-rw-r--r--functest/utils/openstack_utils.py34
38 files changed, 1603 insertions, 871 deletions
diff --git a/functest/ci/config_functest.yaml b/functest/ci/config_functest.yaml
index 78f6257c8..95a4408a1 100755
--- a/functest/ci/config_functest.yaml
+++ b/functest/ci/config_functest.yaml
@@ -27,6 +27,7 @@ general:
repo_parser: /home/opnfv/repos/parser
repo_domino: /home/opnfv/repos/domino
repo_snaps: /home/opnfv/repos/snaps
+ repo_opera: /home/opnfv/repos/opera
repo_fds: /home/opnfv/repos/fds
repo_securityscan: /home/opnfv/repos/securityscanning
repo_vrouter: /home/opnfv/repos/vnfs/vrouter
diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml
index 9a62770e9..23b214bb7 100755
--- a/functest/ci/testcases.yaml
+++ b/functest/ci/testcases.yaml
@@ -102,7 +102,7 @@ tiers:
name: tempest_smoke_serial
criteria: 'success_rate == 100%'
blocking: false
- clean_flag: false
+ clean_flag: true
description: >-
This test case runs the smoke subset of the OpenStack
Tempest suite. The list of test cases is generated by
@@ -134,7 +134,7 @@ tiers:
name: refstack_defcore
criteria: 'success_rate == 100%'
blocking: false
- clean_flag: false
+ clean_flag: true
description: >-
This test case runs a sub group of tests of the OpenStack
Defcore testcases by using refstack client.
@@ -306,19 +306,7 @@ tiers:
run:
module: 'functest.opnfv_tests.features.security_scan'
class: 'SecurityScan'
-# -
-# name: copper
-# criteria: 'status == "PASS"'
-# blocking: false
-# clean_flag: true
-# description: >-
-# Test suite for policy management based on OpenStack Congress
-# dependencies:
-# installer: '(apex)|(joid)'
-# scenario: '^((?!fdio|lxd).)*$'
-# run:
-# module: 'functest.opnfv_tests.features.copper'
-# class: 'Copper'
+
-
name: multisite
criteria: 'success_rate == 100%'
@@ -416,30 +404,30 @@ tiers:
-
name: components
order: 3
- ci_loop: 'daily'
+ ci_loop: 'weekly'
description : >-
Extensive testing of OpenStack API.
testcases:
-# -
-# name: tempest_full_parallel
-# criteria: 'success_rate >= 80%'
-# blocking: false
-# clean_flag: false
-# description: >-
-# The list of test cases is generated by
-# Tempest automatically and depends on the parameters of
-# the OpenStack deplopyment.
-# dependencies:
-# installer: '^((?!netvirt).)*$'
-# scenario: ''
-# run:
-# module: 'functest.opnfv_tests.openstack.tempest.tempest'
-# class: 'TempestFullParallel'
+ -
+ name: tempest_full_parallel
+ criteria: 'success_rate >= 80%'
+ blocking: false
+ clean_flag: true
+ description: >-
+ The list of test cases is generated by
+ Tempest automatically and depends on the parameters of
+ the OpenStack deplopyment.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module: 'functest.opnfv_tests.openstack.tempest.tempest'
+ class: 'TempestFullParallel'
-
name: tempest_custom
criteria: 'success_rate == 100%'
blocking: false
- clean_flag: false
+ clean_flag: true
description: >-
The test case allows running a customized list of tempest
test cases defined in a file under
@@ -452,20 +440,20 @@ tiers:
run:
module: 'functest.opnfv_tests.openstack.tempest.tempest'
class: 'TempestCustom'
-# -
-# name: rally_full
-# criteria: 'success_rate >= 90%'
-# blocking: false
-# clean_flag: false
-# description: >-
-# This test case runs the full suite of scenarios of the OpenStack
-# Rally suite using several threads and iterations.
-# dependencies:
-# installer: '^((?!netvirt).)*$'
-# scenario: ''
-# run:
-# module: 'functest.opnfv_tests.openstack.rally.rally'
-# class: 'RallyFull'
+ -
+ name: rally_full
+ criteria: 'success_rate >= 90%'
+ blocking: false
+ clean_flag: false
+ description: >-
+ This test case runs the full suite of scenarios of the OpenStack
+ Rally suite using several threads and iterations.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module: 'functest.opnfv_tests.openstack.rally.rally'
+ class: 'RallyFull'
-
name: vnf
@@ -487,7 +475,7 @@ tiers:
scenario: '(ocl)|(nosdn)|^(os-odl)((?!bgpvpn).)*$'
run:
module: 'functest.opnfv_tests.vnf.ims.cloudify_ims'
- class: 'ImsVnf'
+ class: 'CloudifyIms'
# -
# name: aaa
# criteria: 'ret == 0'
@@ -517,17 +505,17 @@ tiers:
-
name: opera_ims
- criteria: 'ret == 0'
+ criteria: 'status == "PASS"'
blocking: false
clean_flag: true
description: >-
- Evolution of vIMS
+ VNF deployment with OPEN-O
dependencies:
installer: 'unknown'
scenario: 'unknown'
run:
module: 'functest.opnfv_tests.vnf.ims.opera_ims'
- class: 'ImsVnf'
+ class: 'OperaIms'
-
name: vyos_vrouter
diff --git a/functest/core/feature_base.py b/functest/core/feature.py
index f7f3e4605..325c10d49 100644
--- a/functest/core/feature_base.py
+++ b/functest/core/feature.py
@@ -6,10 +6,10 @@ import functest.utils.functest_logger as ft_logger
from functest.utils.constants import CONST
-class FeatureBase(base.TestCase):
+class Feature(base.TestCase):
def __init__(self, project='functest', case='', repo='', cmd=''):
- super(FeatureBase, self).__init__()
+ super(Feature, self).__init__()
self.project_name = project
self.case_name = case
self.cmd = cmd
diff --git a/functest/core/vnf_base.py b/functest/core/vnf_base.py
index c374c4917..3f0adcc66 100644
--- a/functest/core/vnf_base.py
+++ b/functest/core/vnf_base.py
@@ -7,15 +7,14 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-import time
-
import inspect
+import time
+import functest.core.testcase as base
+from functest.utils.constants import CONST
import functest.utils.functest_logger as ft_logger
-import functest.utils.openstack_utils as os_utils
import functest.utils.functest_utils as ft_utils
-import testcase as base
-from functest.utils.constants import CONST
+import functest.utils.openstack_utils as os_utils
class VnfOnBoardingBase(base.TestCase):
@@ -29,7 +28,7 @@ class VnfOnBoardingBase(base.TestCase):
self.case_name = case
self.cmd = cmd
self.details = {}
- self.data_dir = CONST.dir_functest_data
+ self.result_dir = CONST.dir_results
self.details['orchestrator'] = {}
self.details['vnf'] = {}
self.details['test_vnf'] = {}
@@ -39,14 +38,14 @@ class VnfOnBoardingBase(base.TestCase):
'vnf_{}_tenant_name'.format(self.case_name))
self.tenant_description = CONST.__getattribute__(
'vnf_{}_tenant_description'.format(self.case_name))
- except:
+ except Exception:
# raise Exception("Unknown VNF case=" + self.case_name)
self.logger.error("Unknown VNF case={}".format(self.case_name))
try:
self.images = CONST.__getattribute__(
'vnf_{}_tenant_images'.format(self.case_name))
- except:
+ except Exception:
self.logger.warn("No tenant image defined for this VNF")
def execute(self):
@@ -234,7 +233,12 @@ class VnfOnBoardingBase(base.TestCase):
def step_failure(self, error_msg):
part = inspect.stack()[1][3]
- self.details[part]['status'] = 'FAIL'
- self.details[part]['result'] = error_msg
- self.logger.error("Step failure:{}".format(error_msg))
+ self.logger.error("Step '%s' failed: %s", part, error_msg)
+ try:
+ part_info = self.details[part]
+ except KeyError:
+ self.details[part] = {}
+ part_info = self.details[part]
+ part_info['status'] = 'FAIL'
+ part_info['result'] = error_msg
raise Exception(error_msg)
diff --git a/functest/opnfv_tests/features/barometer.py b/functest/opnfv_tests/features/barometer.py
index 32067284b..6011340f8 100644
--- a/functest/opnfv_tests/features/barometer.py
+++ b/functest/opnfv_tests/features/barometer.py
@@ -8,10 +8,10 @@
from baro_tests import collectd
-import functest.core.feature_base as base
+import functest.core.feature as base
-class BarometerCollectd(base.FeatureBase):
+class BarometerCollectd(base.Feature):
'''
Class for executing barometercollectd testcase.
'''
diff --git a/functest/opnfv_tests/features/copper.py b/functest/opnfv_tests/features/copper.py
index 735b315d2..689341eab 100644
--- a/functest/opnfv_tests/features/copper.py
+++ b/functest/opnfv_tests/features/copper.py
@@ -14,10 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Copper(base.FeatureBase):
+class Copper(base.Feature):
def __init__(self):
super(Copper, self).__init__(project='copper',
case='copper-notification',
diff --git a/functest/opnfv_tests/features/doctor.py b/functest/opnfv_tests/features/doctor.py
index 4d295a674..d32bbfc99 100644
--- a/functest/opnfv_tests/features/doctor.py
+++ b/functest/opnfv_tests/features/doctor.py
@@ -13,10 +13,10 @@
# 0.2: measure test duration and publish results under json format
#
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Doctor(base.FeatureBase):
+class Doctor(base.Feature):
def __init__(self):
super(Doctor, self).__init__(project='doctor',
case='doctor-notification',
diff --git a/functest/opnfv_tests/features/domino.py b/functest/opnfv_tests/features/domino.py
index b36220fa0..e34429bc2 100644
--- a/functest/opnfv_tests/features/domino.py
+++ b/functest/opnfv_tests/features/domino.py
@@ -14,10 +14,10 @@
# 0.3: add report flag to push results when needed
# 0.4: refactoring to match Test abstraction class
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Domino(base.FeatureBase):
+class Domino(base.Feature):
def __init__(self):
super(Domino, self).__init__(project='domino',
case='domino-multinode',
diff --git a/functest/opnfv_tests/features/netready.py b/functest/opnfv_tests/features/netready.py
index dec2a23ce..88f377c26 100644
--- a/functest/opnfv_tests/features/netready.py
+++ b/functest/opnfv_tests/features/netready.py
@@ -8,10 +8,10 @@
#
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class GluonVping(base.FeatureBase):
+class GluonVping(base.Feature):
def __init__(self):
super(GluonVping, self).__init__(project='netready',
diff --git a/functest/opnfv_tests/features/odl_sfc.py b/functest/opnfv_tests/features/odl_sfc.py
index 431cd47e4..fff7f2b0d 100644
--- a/functest/opnfv_tests/features/odl_sfc.py
+++ b/functest/opnfv_tests/features/odl_sfc.py
@@ -7,16 +7,14 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
-import functest.core.feature_base as base
-from sfc.tests.functest import run_tests
+import functest.core.feature as base
-class OpenDaylightSFC(base.FeatureBase):
+class OpenDaylightSFC(base.Feature):
def __init__(self):
super(OpenDaylightSFC, self).__init__(project='sfc',
case='functest-odl-sfc',
repo='dir_repo_sfc')
-
- def execute(self):
- return run_tests.main()
+ dir_sfc_functest = '{}/sfc/tests/functest'.format(self.repo)
+ self.cmd = 'cd %s && python ./run_tests.py' % dir_sfc_functest
diff --git a/functest/opnfv_tests/features/promise.py b/functest/opnfv_tests/features/promise.py
index 15636fbfe..a7f4e6283 100644
--- a/functest/opnfv_tests/features/promise.py
+++ b/functest/opnfv_tests/features/promise.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Promise(base.FeatureBase):
+class Promise(base.Feature):
def __init__(self):
super(Promise, self).__init__(project='promise',
case='promise',
diff --git a/functest/opnfv_tests/features/sdnvpn.py b/functest/opnfv_tests/features/sdnvpn.py
index 1919a03c2..10e3146c8 100644
--- a/functest/opnfv_tests/features/sdnvpn.py
+++ b/functest/opnfv_tests/features/sdnvpn.py
@@ -7,10 +7,10 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class SdnVpnTests(base.FeatureBase):
+class SdnVpnTests(base.Feature):
def __init__(self):
super(SdnVpnTests, self).__init__(project='sdnvpn',
diff --git a/functest/opnfv_tests/features/security_scan.py b/functest/opnfv_tests/features/security_scan.py
index 58f0ec748..2374b39fe 100644
--- a/functest/opnfv_tests/features/security_scan.py
+++ b/functest/opnfv_tests/features/security_scan.py
@@ -8,11 +8,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
-import functest.core.feature_base as base
+import functest.core.feature as base
from functest.utils.constants import CONST
-class SecurityScan(base.FeatureBase):
+class SecurityScan(base.Feature):
def __init__(self):
super(SecurityScan, self).__init__(project='securityscanning',
case='security_scan',
diff --git a/functest/opnfv_tests/mano/orchestra.py b/functest/opnfv_tests/mano/orchestra.py
index fd5e40d05..a9cf0ae66 100644
--- a/functest/opnfv_tests/mano/orchestra.py
+++ b/functest/opnfv_tests/mano/orchestra.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Orchestra(base.FeatureBase):
+class Orchestra(base.Feature):
def __init__(self):
super(Orchestra, self).__init__(project='orchestra',
case='orchestra',
diff --git a/functest/opnfv_tests/openstack/refstack_client/refstack_client.py b/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
index 597ece3e4..37aa9e399 100755
--- a/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
+++ b/functest/opnfv_tests/openstack/refstack_client/refstack_client.py
@@ -14,10 +14,10 @@ import time
from functest.core import testcase
from functest.opnfv_tests.openstack.tempest import conf_utils
-from functest.utils import openstack_utils
from functest.utils.constants import CONST
import functest.utils.functest_logger as ft_logger
import functest.utils.functest_utils as ft_utils
+from tempest_conf import TempestConf
""" logging configuration """
logger = ft_logger.Logger("refstack_defcore").getLogger()
@@ -35,12 +35,6 @@ class RefstackClient(testcase.TestCase):
self.CONF_PATH)
self.defcorelist = os.path.join(self.FUNCTEST_TEST,
self.DEFCORE_LIST)
- self.VERIFIER_ID = conf_utils.get_verifier_id()
- self.VERIFIER_REPO_DIR = conf_utils.get_verifier_repo_dir(
- self.VERIFIER_ID)
- self.DEPLOYMENT_ID = conf_utils.get_verifier_deployment_id()
- self.DEPLOYMENT_DIR = conf_utils.get_verifier_deployment_dir(
- self.VERIFIER_ID, self.DEPLOYMENT_ID)
def source_venv(self):
@@ -143,28 +137,18 @@ class RefstackClient(testcase.TestCase):
logger.info("Testcase %s success_rate is %s%%, is marked as %s"
% (self.case_name, success_rate, self.criteria))
- def defcore_env_prepare(self):
- try:
- img_flavor_dict = conf_utils.create_tempest_resources(
- use_custom_images=True, use_custom_flavors=True)
- conf_utils.configure_tempest_defcore(
- self.DEPLOYMENT_DIR, img_flavor_dict)
- self.source_venv()
- res = testcase.TestCase.EX_OK
- except KeyError as e:
- logger.error("defcore prepare env error with: %s", e)
- res = testcase.TestCase.EX_RUN_ERROR
-
- return res
-
def run(self):
+ '''used for functest command line,
+ functest testcase run refstack_defcore'''
self.start_time = time.time()
if not os.path.exists(conf_utils.REFSTACK_RESULTS_DIR):
os.makedirs(conf_utils.REFSTACK_RESULTS_DIR)
try:
- self.defcore_env_prepare()
+ tempestconf = TempestConf()
+ tempestconf.generate_tempestconf()
+ self.source_venv()
self.run_defcore_default()
self.parse_refstack_result()
res = testcase.TestCase.EX_OK
@@ -175,18 +159,31 @@ class RefstackClient(testcase.TestCase):
self.stop_time = time.time()
return res
+ def _prep_test(self):
+ '''Check that the config file exists.'''
+ if not os.path.isfile(self.confpath):
+ logger.error("Conf file not valid: %s" % self.confpath)
+ if not os.path.isfile(self.testlist):
+ logger.error("testlist file not valid: %s" % self.testlist)
+
def main(self, **kwargs):
+ '''used for manually running,
+ python refstack_client.py -c <tempest_conf_path>
+ --testlist <testlist_path>
+ can generate a reference tempest.conf by
+ python tempest_conf.py
+ '''
try:
- tempestconf = kwargs['config']
- testlist = kwargs['testlist']
+ self.confpath = kwargs['config']
+ self.testlist = kwargs['testlist']
except KeyError as e:
logger.error("Cannot run refstack client. Please check "
"%s", e)
return self.EX_RUN_ERROR
try:
- openstack_utils.source_credentials(CONST.openstack_creds)
- self.defcore_env_prepare()
- self.run_defcore(tempestconf, testlist)
+ self.source_venv()
+ self._prep_test()
+ self.run_defcore(self.confpath, self.testlist)
res = testcase.TestCase.EX_OK
except Exception as e:
logger.error('Error with run: %s', e)
diff --git a/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py b/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py
new file mode 100755
index 000000000..5624ed79b
--- /dev/null
+++ b/functest/opnfv_tests/openstack/refstack_client/tempest_conf.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+
+# matthew.lijun@huawei.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+import sys
+
+from functest.core import testcase
+from functest.opnfv_tests.openstack.tempest import conf_utils
+from functest.utils import openstack_utils
+from functest.utils.constants import CONST
+import functest.utils.functest_logger as ft_logger
+
+""" logging configuration """
+logger = ft_logger.Logger("refstack_defcore").getLogger()
+
+
+class TempestConf(object):
+ def __init__(self):
+ self.VERIFIER_ID = conf_utils.get_verifier_id()
+ self.VERIFIER_REPO_DIR = conf_utils.get_verifier_repo_dir(
+ self.VERIFIER_ID)
+ self.DEPLOYMENT_ID = conf_utils.get_verifier_deployment_id()
+ self.DEPLOYMENT_DIR = conf_utils.get_verifier_deployment_dir(
+ self.VERIFIER_ID, self.DEPLOYMENT_ID)
+
+ def generate_tempestconf(self):
+ try:
+ openstack_utils.source_credentials(CONST.openstack_creds)
+ img_flavor_dict = conf_utils.create_tempest_resources(
+ use_custom_images=True, use_custom_flavors=True)
+ conf_utils.configure_tempest_defcore(
+ self.DEPLOYMENT_DIR, img_flavor_dict)
+ except KeyError as e:
+ logger.error("defcore prepare env error with: %s", e)
+
+ def main(self):
+ try:
+ self.generate_tempestconf()
+ res = testcase.TestCase.EX_OK
+ except Exception as e:
+ logger.error('Error with run: %s', e)
+ res = testcase.TestCase.EX_RUN_ERROR
+
+ return res
+
+if __name__ == '__main__':
+ tempestconf = TempestConf()
+ result = tempestconf.main()
+ if result != testcase.TestCase.EX_OK:
+ sys.exit(result)
diff --git a/functest/opnfv_tests/openstack/tempest/custom_tests/blacklist.txt b/functest/opnfv_tests/openstack/tempest/custom_tests/blacklist.txt
index 43edabc10..0da92cd8a 100644
--- a/functest/opnfv_tests/openstack/tempest/custom_tests/blacklist.txt
+++ b/functest/opnfv_tests/openstack/tempest/custom_tests/blacklist.txt
@@ -27,11 +27,7 @@
scenarios:
- os-onos-nofeature-ha
- os-onos-nofeature-noha
- - os-onos-sfc-ha
- - os-onos-sfc-noha
installers:
- - fuel
- - apex
- compass
tests:
- tempest.api.compute.servers.test_server_actions.ServerActionsTestJSON.test_reboot_server_hard
@@ -41,22 +37,6 @@
- tempest.scenario.test_volume_boot_pattern.TestVolumeBootPatternV2.test_volume_boot_pattern
-
- scenarios:
- - os-onos-nofeature-ha
- - os-onos-nofeature-noha
- - os-onos-sfc-ha
- - os-onos-sfc-noha
- installers:
- - joid
- tests:
- - tempest.api.object_storage
- - tempest.api.compute.servers.test_server_actions.ServerActionsTestJSON.test_reboot_server_hard
- - tempest.scenario.test_network_basic_ops.TestNetworkBasicOps.test_network_basic_ops
- - tempest.scenario.test_server_basic_ops.TestServerBasicOps.test_server_basic_ops
- - tempest.scenario.test_volume_boot_pattern.TestVolumeBootPattern.test_volume_boot_pattern
- - tempest.scenario.test_volume_boot_pattern.TestVolumeBootPatternV2.test_volume_boot_pattern
-
--
# https://bugs.launchpad.net/tempest/+bug/1577632
scenarios:
- os-odl_l2-nofeature-ha
@@ -71,3 +51,15 @@
- fuel
tests:
- tempest.scenario.test_server_basic_ops.TestServerBasicOps.test_server_basic_ops
+
+
+ # https://bugs.opendaylight.org/show_bug.cgi?id=5586
+ scenarios:
+ - os-odl-bgpvpn-ha
+ - os-odl-gluon-noha
+ - os-odl_l2-bgpvpn-ha
+ installers:
+ - apex
+ - fuel
+ tests:
+ - tempest.api.compute.servers.test_server_actions.ServerActionsTestJSON.test_reboot_server_hard
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index f65118f6f..ccc1101a5 100755
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -7,6 +7,15 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+"""Define classes required to run ODL suites.
+
+It has been designed for any context. But helpers are given for
+running test suites in OPNFV environment.
+
+Example:
+ $ python odl.py
+"""
+
import argparse
import errno
import fileinput
@@ -24,8 +33,11 @@ from functest.core import testcase
import functest.utils.functest_logger as ft_logger
import functest.utils.openstack_utils as op_utils
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
class ODLResultVisitor(robot.api.ResultVisitor):
+ """Visitor to get result details."""
def __init__(self):
self._data = []
@@ -43,10 +55,12 @@ class ODLResultVisitor(robot.api.ResultVisitor):
self._data.append(output)
def get_data(self):
+ """Get the details of the result."""
return self._data
class ODLTests(testcase.TestCase):
+ """ODL test runner."""
repos = "/home/opnfv/repos/"
odl_test_repo = os.path.join(repos, "odl_test")
@@ -64,6 +78,12 @@ class ODLTests(testcase.TestCase):
@classmethod
def set_robotframework_vars(cls, odlusername="admin", odlpassword="admin"):
+ """Set credentials in csit/variables/Variables.py.
+
+ Returns:
+ True if credentials are set.
+ False otherwise.
+ """
odl_variables_files = os.path.join(cls.odl_test_repo,
'csit/variables/Variables.py')
try:
@@ -79,6 +99,7 @@ class ODLTests(testcase.TestCase):
return False
def parse_results(self):
+ """Parse output.xml and get the details in it."""
xml_file = os.path.join(self.res_dir, 'output.xml')
result = robot.api.ExecutionResult(xml_file)
visitor = ODLResultVisitor()
@@ -91,6 +112,34 @@ class ODLTests(testcase.TestCase):
self.details['tests'] = visitor.get_data()
def main(self, suites=None, **kwargs):
+ """Run the test suites
+
+ It has been designed to be called in any context.
+ It requires the following keyword arguments:
+ * odlusername,
+ * odlpassword,
+ * osauthurl,
+ * neutronip,
+ * osusername,
+ * ostenantname,
+ * ospassword,
+ * odlip,
+ * odlwebport,
+ * odlrestconfport.
+
+ Here are the steps:
+ * set all RobotFramework_variables,
+ * create the output directories if required,
+ * get the results in output.xml,
+ * delete temporary files.
+
+ Args:
+ **kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ EX_OK if all suites ran well.
+ EX_RUN_ERROR otherwise.
+ """
try:
if not suites:
suites = self.default_suites
@@ -146,6 +195,18 @@ class ODLTests(testcase.TestCase):
return self.EX_RUN_ERROR
def run(self, **kwargs):
+ """Run suites in OPNFV environment
+
+ It basically check env vars to call main() with the keywords
+ required.
+
+ Args:
+ **kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ EX_OK if all suites ran well.
+ EX_RUN_ERROR otherwise.
+ """
try:
suites = self.default_suites
try:
@@ -191,6 +252,7 @@ class ODLTests(testcase.TestCase):
class ODLParser(object): # pylint: disable=too-few-public-methods
+ """Parser to run ODL test suites."""
def __init__(self):
self.parser = argparse.ArgumentParser()
@@ -229,6 +291,13 @@ class ODLParser(object): # pylint: disable=too-few-public-methods
action='store_true')
def parse_args(self, argv=None):
+ """Parse arguments.
+
+ It can call sys.exit if arguments are incorrect.
+
+ Returns:
+ the arguments from cmdline
+ """
if not argv:
argv = []
return vars(self.parser.parse_args(argv))
diff --git a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
new file mode 100644
index 000000000..2fc5449cf
--- /dev/null
+++ b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2017 All rights reserved
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+import json
+import os
+import shutil
+
+import requests
+
+import functest.core.vnf_base as vnf_base
+from functest.utils.constants import CONST
+import functest.utils.functest_logger as ft_logger
+import functest.utils.functest_utils as ft_utils
+
+
+class ClearwaterOnBoardingBase(vnf_base.VnfOnBoardingBase):
+
+ def __init__(self, project='functest', case='', repo='', cmd=''):
+ self.logger = ft_logger.Logger(__name__).getLogger()
+ super(ClearwaterOnBoardingBase, self).__init__(
+ project, case, repo, cmd)
+ self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf', 'ims')
+ self.data_dir = CONST.dir_ims_data
+ self.result_dir = os.path.join(CONST.dir_results, case)
+ self.test_dir = CONST.dir_repo_vims_test
+
+ if not os.path.exists(self.data_dir):
+ os.makedirs(self.data_dir)
+ if not os.path.exists(self.result_dir):
+ os.makedirs(self.result_dir)
+
+ def config_ellis(self, ellis_ip, signup_code='secret', two_numbers=False):
+ output_dict = {}
+ self.logger.info('Configure Ellis: %s', ellis_ip)
+ output_dict['ellis_ip'] = ellis_ip
+ account_url = 'http://{0}/accounts'.format(ellis_ip)
+ params = {"password": "functest",
+ "full_name": "opnfv functest user",
+ "email": "functest@opnfv.org",
+ "signup_code": signup_code}
+ rq = requests.post(account_url, data=params)
+ output_dict['login'] = params
+ if rq.status_code != 201 and rq.status_code != 409:
+ raise Exception("Unable to create an account for number provision")
+ self.logger.info('Account is created on Ellis: %s', params)
+
+ session_url = 'http://{0}/session'.format(ellis_ip)
+ session_data = {
+ 'username': params['email'],
+ 'password': params['password'],
+ 'email': params['email']
+ }
+ rq = requests.post(session_url, data=session_data)
+ if rq.status_code != 201:
+ raise Exception('Failed to get cookie for Ellis')
+ cookies = rq.cookies
+ self.logger.info('Cookies: %s' % cookies)
+
+ number_url = 'http://{0}/accounts/{1}/numbers'.format(
+ ellis_ip,
+ params['email'])
+ self.logger.info('Create 1st calling number on Ellis')
+ number_res = self.create_ellis_number(number_url, cookies)
+ output_dict['number'] = number_res
+
+ if two_numbers:
+ self.logger.info('Create 2nd calling number on Ellis')
+ number_res = self.create_ellis_number(number_url, cookies)
+ output_dict['number2'] = number_res
+
+ return output_dict
+
+ def create_ellis_number(self, number_url, cookies):
+ rq = requests.post(number_url, cookies=cookies)
+
+ if rq.status_code != 200:
+ if rq and rq.json():
+ reason = rq.json()['reason']
+ else:
+ reason = rq
+ raise Exception("Unable to create a number: %s" % reason)
+ number_res = rq.json()
+ self.logger.info('Calling number is created: %s', number_res)
+ return number_res
+
+ def run_clearwater_live_test(self, dns_ip, public_domain,
+ bono_ip=None, ellis_ip=None,
+ signup_code='secret'):
+ self.logger.info('Run Clearwater live test')
+ nameservers = ft_utils.get_resolvconf_ns()
+ resolvconf = ['{0}{1}{2}'.format(os.linesep, 'nameserver ', ns)
+ for ns in nameservers]
+ self.logger.debug('resolvconf: %s', resolvconf)
+ dns_file = '/etc/resolv.conf'
+ dns_file_bak = '/etc/resolv.conf.bak'
+ shutil.copy(dns_file, dns_file_bak)
+ script = ('echo -e "nameserver {0}{1}" > {2};'
+ 'source /etc/profile.d/rvm.sh;'
+ 'cd {3};'
+ 'rake test[{4}] SIGNUP_CODE={5}'
+ .format(dns_ip,
+ ''.join(resolvconf),
+ dns_file,
+ self.test_dir,
+ public_domain,
+ signup_code))
+ if bono_ip and ellis_ip:
+ subscript = ' PROXY={0} ELLIS={1}'.format(bono_ip, ellis_ip)
+ script = '{0}{1}'.format(script, subscript)
+ script = ('{0}{1}'.format(script, ' --trace'))
+ cmd = "/bin/bash -c '{0}'".format(script)
+ self.logger.info('Live test cmd: %s', cmd)
+ output_file = os.path.join(self.result_dir, "ims_test_output.txt")
+ ft_utils.execute_command(cmd,
+ error_msg='Clearwater live test failed',
+ output_file=output_file)
+
+ with open(dns_file_bak, 'r') as bak_file:
+ result = bak_file.read()
+ with open(dns_file, 'w') as f:
+ f.write(result)
+
+ f = open(output_file, 'r')
+ result = f.read()
+ if result != "":
+ self.logger.debug(result)
+
+ vims_test_result = ""
+ tempFile = os.path.join(self.test_dir, "temp.json")
+ try:
+ self.logger.debug("Trying to load test results")
+ with open(tempFile) as f:
+ vims_test_result = json.load(f)
+ f.close()
+ except Exception:
+ self.logger.error("Unable to retrieve test results")
+
+ try:
+ os.remove(tempFile)
+ except Exception:
+ self.logger.error("Deleting file failed")
+
+ return vims_test_result
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index f7dfd532f..404f208eb 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -7,42 +7,37 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-import json
import os
-import requests
-import subprocess
import sys
import time
+
+import requests
import yaml
-import functest.core.vnf_base as vnf_base
+from functest.opnfv_tests.vnf.ims.clearwater import Clearwater
+import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
+from functest.opnfv_tests.vnf.ims.orchestrator_cloudify import Orchestrator
+from functest.utils.constants import CONST
import functest.utils.functest_logger as ft_logger
import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
-from clearwater import Clearwater
-from functest.utils.constants import CONST
-from orchestrator_cloudify import Orchestrator
-
-class ImsVnf(vnf_base.VnfOnBoardingBase):
+class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
def __init__(self, project='functest', case='cloudify_ims',
repo='', cmd=''):
- super(ImsVnf, self).__init__(project, case, repo, cmd)
- self.logger = ft_logger.Logger("vIMS").getLogger()
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf/ims/')
- self.data_dir = CONST.dir_ims_data
- self.test_dir = CONST.dir_repo_vims_test
+ super(CloudifyIms, self).__init__(project, case, repo, cmd)
+ self.logger = ft_logger.Logger(__name__).getLogger()
# Retrieve the configuration
try:
self.config = CONST.__getattribute__(
'vnf_{}_config'.format(self.case_name))
- except:
+ except Exception:
raise Exception("VNF config file not found")
- config_file = self.case_dir + self.config
+ config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
requirements=get_config("cloudify.requirements", config_file),
blueprint=get_config("cloudify.blueprint", config_file),
@@ -61,10 +56,6 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
self.images = get_config("tenant_images", config_file)
self.logger.info("Images needed for vIMS: %s" % self.images)
- # vIMS Data directory creation
- if not os.path.exists(self.data_dir):
- os.makedirs(self.data_dir)
-
def deploy_orchestrator(self, **kwargs):
self.logger.info("Additional pre-configuration steps")
@@ -82,7 +73,7 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
image_id = os_utils.get_image_id(self.glance_client,
image_name)
self.logger.debug("image_id: %s" % image_id)
- except:
+ except Exception:
self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
if image_id == '':
@@ -177,10 +168,11 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
self.logger.debug("Resolvconf set")
self.logger.info("Prepare virtualenv for cloudify-cli")
- cmd = "chmod +x " + self.case_dir + "create_venv.sh"
+ venv_scrit_dir = os.path.join(self.case_dir, "create_venv.sh")
+ cmd = "chmod +x " + venv_scrit_dir
ft_utils.execute_command(cmd)
time.sleep(3)
- cmd = self.case_dir + "create_venv.sh " + self.data_dir
+ cmd = venv_scrit_dir + " " + self.data_dir
ft_utils.execute_command(cmd)
cfy.download_manager_blueprint(
@@ -251,99 +243,25 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
self.logger.debug("Trying to get clearwater manager IP ... ")
mgr_ip = os.popen(cmd).read()
mgr_ip = mgr_ip.splitlines()[0]
- except:
+ except Exception:
self.step_failure("Unable to retrieve the IP of the "
"cloudify manager server !")
- api_url = "http://" + mgr_ip + "/api/v2"
- dep_outputs = requests.get(api_url + "/deployments/" +
- self.vnf['deployment_name'] + "/outputs")
- dns_ip = dep_outputs.json()['outputs']['dns_ip']
- ellis_ip = dep_outputs.json()['outputs']['ellis_ip']
-
- self.logger.debug("DNS ip : %s" % dns_ip)
- self.logger.debug("ELLIS ip : %s" % ellis_ip)
-
- ellis_url = "http://" + ellis_ip + "/"
- url = ellis_url + "accounts"
-
- params = {"password": "functest",
- "full_name": "opnfv functest user",
- "email": "functest@opnfv.fr",
- "signup_code": "secret"}
-
- rq = requests.post(url, data=params)
- i = 30
- while rq.status_code != 201 and i > 0:
- rq = requests.post(url, data=params)
- self.logger.debug("Account creation http status code: %s"
- % rq.status_code)
- i = i - 1
- time.sleep(10)
-
- if rq.status_code == 201:
- url = ellis_url + "session"
- rq = requests.post(url, data=params)
- cookies = rq.cookies
- else:
- self.step_failure("Unable to create an account")
-
- url = ellis_url + "accounts/" + params['email'] + "/numbers"
- if cookies != "":
- rq = requests.post(url, cookies=cookies)
- i = 24
- while rq.status_code != 200 and i > 0:
- rq = requests.post(url, cookies=cookies)
- self.logger.debug("Number creation http status code: %s"
- % rq.status_code)
- i = i - 1
- time.sleep(25)
-
- if rq.status_code != 200:
- self.step_failure("Unable to create a number: %s"
- % rq.json()['reason'])
-
- nameservers = ft_utils.get_resolvconf_ns()
- resolvconf = ""
- for ns in nameservers:
- resolvconf += "\nnameserver " + ns
+ self.logger.info('Cloudify Manager: %s', mgr_ip)
+ api_url = 'http://{0}/api/v2/deployments/{1}/outputs'.format(
+ mgr_ip, self.vnf['deployment_name'])
+ dep_outputs = requests.get(api_url)
+ self.logger.info(api_url)
+ outputs = dep_outputs.json()['outputs']
+ self.logger.info("Deployment outputs: %s", outputs)
+ dns_ip = outputs['dns_ip']
+ ellis_ip = outputs['ellis_ip']
+ self.config_ellis(ellis_ip)
if dns_ip != "":
- script = ('echo -e "nameserver ' + dns_ip + resolvconf +
- '" > /etc/resolv.conf; ')
- script += 'source /etc/profile.d/rvm.sh; '
- script += 'cd {0}; '
- script += ('rake test[{1}] SIGNUP_CODE="secret"')
-
- cmd = ("/bin/bash -c '" +
- script.format(self.data_dir, self.inputs["public_domain"]) +
- "'")
- output_file = "output.txt"
- f = open(output_file, 'w+')
- subprocess.call(cmd, shell=True, stdout=f,
- stderr=subprocess.STDOUT)
- f.close()
-
- f = open(output_file, 'r')
- result = f.read()
- if result != "":
- self.logger.debug(result)
-
- vims_test_result = ""
- tempFile = os.path.join(self.test_dir, "temp.json")
- try:
- self.logger.debug("Trying to load test results")
- with open(tempFile) as f:
- vims_test_result = json.load(f)
- f.close()
- except:
- self.logger.error("Unable to retrieve test results")
-
- try:
- os.remove(tempFile)
- except:
- self.logger.error("Deleting file failed")
-
+ vims_test_result = self.run_clearwater_live_test(
+ dns_ip=dns_ip,
+ public_domain=self.inputs["public_domain"])
if vims_test_result != '':
return {'status': 'PASS', 'result': vims_test_result}
else:
@@ -352,7 +270,7 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
def clean(self):
self.vnf['object'].undeploy_vnf()
self.orchestrator['object'].undeploy_manager()
- super(ImsVnf, self).clean()
+ super(CloudifyIms, self).clean()
def main(self, **kwargs):
self.logger.info("Cloudify IMS VNF onboarding test starting")
diff --git a/functest/opnfv_tests/vnf/ims/opera_ims.py b/functest/opnfv_tests/vnf/ims/opera_ims.py
index 7ead401fe..d022b3c7f 100755..100644
--- a/functest/opnfv_tests/vnf/ims/opera_ims.py
+++ b/functest/opnfv_tests/vnf/ims/opera_ims.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-# Copyright (c) 2016 Orange and others.
+# Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
@@ -8,381 +8,117 @@
# http://www.apache.org/licenses/LICENSE-2.0
import json
-import socket
-import sys
+import os
import time
-import yaml
-import functest.core.vnf_base as vnf_base
-import functest.utils.functest_logger as ft_logger
-import functest.utils.functest_utils as ft_utils
-import functest.utils.openstack_utils as os_utils
-import os
+from opera import openo_connect
+import requests
+
+import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
from functest.utils.constants import CONST
+import functest.utils.functest_logger as ft_logger
-from org.openbaton.cli.agents.agents import MainAgent
-from org.openbaton.cli.errors.errors import NfvoException
+class OperaIms(clearwater_ims_base.ClearwaterOnBoardingBase):
-def servertest(host, port):
- args = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
- for family, socktype, proto, canonname, sockaddr in args:
- s = socket.socket(family, socktype, proto)
+ def __init__(self, project='functest', case='opera_ims',
+ repo=CONST.dir_repo_opera, cmd=''):
+ super(OperaIms, self).__init__(project, case, repo, cmd)
+ self.logger = ft_logger.Logger(__name__).getLogger()
+ self.ellis_file = os.path.join(self.result_dir, 'ellis.info')
+ self.live_test_file = os.path.join(self.result_dir,
+ 'live_test_report.json')
try:
- s.connect(sockaddr)
- except socket.error:
- return False
+ self.openo_msb_endpoint = os.environ['OPENO_MSB_ENDPOINT']
+ except KeyError:
+ raise Exception('OPENO_MSB_ENDPOINT is not specified,'
+ ' put it as <OPEN-O ip>:<port>')
else:
- s.close()
- return True
-
-# ----------------------------------------------------------
-#
-# UTILS
-#
-# -----------------------------------------------------------
-
-
-def get_config(parameter, file):
- """
- Returns the value of a given parameter in file.yaml
- parameter must be given in string format with dots
- Example: general.openstack.image_name
- """
- with open(file) as f:
- file_yaml = yaml.safe_load(f)
- f.close()
- value = file_yaml
- for element in parameter.split("."):
- value = value.get(element)
- if value is None:
- raise ValueError("The parameter %s is not defined in"
- " reporting.yaml" % parameter)
- return value
-
-
-def download_and_add_image_on_glance(glance, image_name,
- image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
- image = os_utils.create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
- return image
-
-
-class ImsVnf(vnf_base.VnfOnBoardingBase):
-
- def __init__(self, project='functest', case='orchestra_ims',
- repo='', cmd=''):
- super(ImsVnf, self).__init__(project, case, repo, cmd)
- self.ob_password = "openbaton"
- self.ob_username = "admin"
- self.ob_https = False
- self.ob_port = "8080"
- self.ob_ip = "localhost"
- self.ob_instance_id = ""
- self.logger = ft_logger.Logger("orchestra_ims").getLogger()
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf/ims/')
- self.data_dir = CONST.dir_ims_data
- self.test_dir = CONST.dir_repo_vims_test
- self.ob_projectid = ""
- self.keystone_client = os_utils.get_keystone_client()
- self.ob_nsr_id = ""
- self.main_agent = None
- # vIMS Data directory creation
- if not os.path.exists(self.data_dir):
- os.makedirs(self.data_dir)
- # Retrieve the configuration
- try:
- self.config = CONST.__getattribute__(
- 'vnf_{}_config'.format(self.case_name))
- except:
- raise Exception("Orchestra VNF config file not found")
- config_file = self.case_dir + self.config
- self.imagename = get_config("openbaton.imagename", config_file)
- self.market_link = get_config("openbaton.marketplace_link",
- config_file)
- self.images = get_config("tenant_images", config_file)
-
- def deploy_orchestrator(self, **kwargs):
- self.logger.info("Additional pre-configuration steps")
- nova_client = os_utils.get_nova_client()
- neutron_client = os_utils.get_neutron_client()
- glance_client = os_utils.get_glance_client()
-
- # needs some images
- self.logger.info("Upload some OS images if it doesn't exist")
- temp_dir = os.path.join(self.data_dir, "tmp/")
- for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s" % (image_name, image_url))
- try:
- image_id = os_utils.get_image_id(glance_client,
- image_name)
- self.logger.info("image_id: %s" % image_id)
- except:
- self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
-
- if image_id == '':
- self.logger.info("""%s image doesn't exist on glance repository. Try
- downloading this image and upload on glance !""" % image_name)
- image_id = download_and_add_image_on_glance(glance_client,
- image_name,
- image_url,
- temp_dir)
- if image_id == '':
- self.step_failure(
- "Failed to find or upload required OS "
- "image for this deployment")
- network_dic = os_utils.create_network_full(neutron_client,
- "openbaton_mgmt",
- "openbaton_mgmt_subnet",
- "openbaton_router",
- "192.168.100.0/24")
-
- # orchestrator VM flavor
- self.logger.info("Check medium Flavor is available, if not create one")
- flavor_exist, flavor_id = os_utils.get_or_create_flavor(
- "m1.medium",
- "4096",
- '1',
- '2',
- public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
-
- if not network_dic:
- self.logger.error("There has been a problem when creating the "
- "neutron network")
-
- network_id = network_dic["net_id"]
-
- self.logger.info("Creating floating IP for VM in advance...")
- floatip_dic = os_utils.create_floating_ip(neutron_client)
- floatip = floatip_dic['fip_addr']
-
- if floatip is None:
- self.logger.error("Cannot create floating IP.")
-
- userdata = "#!/bin/bash\n"
- userdata += "set -x\n"
- userdata += "set -e\n"
- userdata += "echo \"nameserver 8.8.8.8\" >> /etc/resolv.conf\n"
- userdata += "apt-get install curl\n"
- userdata += ("echo \"rabbitmq_broker_ip=%s\" > ./config_file\n"
- % floatip)
- userdata += "echo \"mysql=no\" >> ./config_file\n"
- userdata += ("echo \"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCuPXrV3"
- "geeHc6QUdyUr/1Z+yQiqLcOskiEGBiXr4z76MK4abiFmDZ18OMQlc"
- "fl0p3kS0WynVgyaOHwZkgy/DIoIplONVr2CKBKHtPK+Qcme2PVnCtv"
- "EqItl/FcD+1h5XSQGoa+A1TSGgCod/DPo+pes0piLVXP8Ph6QS1k7S"
- "ic7JDeRQ4oT1bXYpJ2eWBDMfxIWKZqcZRiGPgMIbJ1iEkxbpeaAd9O"
- "4MiM9nGCPESmed+p54uYFjwEDlAJZShcAZziiZYAvMZhvAhe6USljc"
- "7YAdalAnyD/jwCHuwIrUw/lxo7UdNCmaUxeobEYyyFA1YVXzpNFZya"
- "XPGAAYIJwEq/ openbaton@opnfv\" >> /home/ubuntu/.ssh/aut"
- "horized_keys\n")
- userdata += "cat ./config_file\n"
- userdata += ("curl -s http://get.openbaton.org/bootstrap "
- "> ./bootstrap\n")
- userdata += "export OPENBATON_COMPONENT_AUTOSTART=false\n"
- bootstrap = "sh ./bootstrap release -configFile=./config_file"
- userdata += bootstrap + "\n"
-
- userdata += ("echo \"nfvo.plugin.timeout=300000\" >> "
- "/etc/openbaton/openbaton-nfvo.properties\n")
- userdata += "service openbaton-nfvo restart\n"
- userdata += "service openbaton-vnfm-generic restart\n"
-
- sg_id = os_utils.create_security_group_full(neutron_client,
- "orchestra-sec-group",
- "allowall")
-
- os_utils.create_secgroup_rule(neutron_client, sg_id, "ingress",
- "icmp", 0, 255)
- os_utils.create_secgroup_rule(neutron_client, sg_id, "egress",
- "icmp", 0, 255)
- os_utils.create_secgroup_rule(neutron_client, sg_id, "ingress",
- "tcp", 1, 65535)
- os_utils.create_secgroup_rule(neutron_client, sg_id, "ingress",
- "udp", 1, 65535)
- os_utils.create_secgroup_rule(neutron_client, sg_id, "egress",
- "tcp", 1, 65535)
- os_utils.create_secgroup_rule(neutron_client, sg_id, "egress",
- "udp", 1, 65535)
-
- self.logger.info("Security group set")
-
- self.logger.info("Create instance....")
- self.logger.info("flavor: m1.medium\n"
- "image: %s\n"
- "network_id: %s\n"
- "userdata: %s\n"
- % (self.imagename, network_id, userdata))
-
- instance = os_utils.create_instance_and_wait_for_active(
- "m1.medium",
- os_utils.get_image_id(glance_client, self.imagename),
- network_id,
- "orchestra-openbaton",
- config_drive=False,
- userdata=userdata)
+ self.logger.info('OPEN-O endpoint is: %s', self.openo_msb_endpoint)
- self.ob_instance_id = instance.id
+ def prepare(self):
+ pass
- self.logger.info("Adding sec group to orchestra instance")
- os_utils.add_secgroup_to_instance(nova_client,
- self.ob_instance_id, sg_id)
-
- self.logger.info("Associating floating ip: '%s' to VM '%s' "
- % (floatip, "orchestra-openbaton"))
- if not os_utils.add_floating_ip(nova_client, instance.id, floatip):
- self.logger.error("Cannot associate floating IP to VM.")
- self.step_failure("Cannot associate floating IP to VM.")
-
- self.logger.info("Waiting for nfvo to be up and running...")
- x = 0
- while x < 100:
- if servertest(floatip, "8080"):
- break
- else:
- self.logger.debug("openbaton is not started yet")
- time.sleep(5)
- x += 1
-
- if x == 100:
- self.logger.error("Openbaton is not started correctly")
- self.step_failure("Openbaton is not started correctly")
-
- self.ob_ip = floatip
- self.ob_password = "openbaton"
- self.ob_username = "admin"
- self.ob_https = False
- self.ob_port = "8080"
-
- self.logger.info("Deploy orchestrator: OK")
+ def clean(self):
+ pass
def deploy_vnf(self):
- self.logger.info("vIMS Deployment")
-
- self.main_agent = MainAgent(nfvo_ip=self.ob_ip,
- nfvo_port=self.ob_port,
- https=self.ob_https,
- version=1,
- username=self.ob_username,
- password=self.ob_password)
-
- project_agent = self.main_agent.get_agent("project", self.ob_projectid)
- for p in json.loads(project_agent.find()):
- if p.get("name") == "default":
- self.ob_projectid = p.get("id")
- break
-
- self.logger.debug("project id: %s" % self.ob_projectid)
- if self.ob_projectid == "":
- self.logger.error("Default project id was not found!")
- self.step_failure("Default project id was not found!")
-
- vim_json = {
- "name": "vim-instance",
- "authUrl": os_utils.get_credentials().get("auth_url"),
- "tenant": os_utils.get_credentials().get("tenant_name"),
- "username": os_utils.get_credentials().get("username"),
- "password": os_utils.get_credentials().get("password"),
- # "keyPair": "opnfv",
- # TODO change the keypair to correct value
- # or upload a correct one or remove it
- "securityGroups": [
- "default",
- "orchestra-sec-group"
- ],
- "type": "openstack",
- "location": {
- "name": "opnfv",
- "latitude": "52.525876",
- "longitude": "13.314400"
- }
- }
-
- self.logger.debug("vim: %s" % vim_json)
-
- self.main_agent.get_agent(
- "vim",
- project_id=self.ob_projectid).create(entity=json.dumps(vim_json))
-
- market_agent = self.main_agent.get_agent("market",
- project_id=self.ob_projectid)
-
- nsd = {}
- try:
- self.logger.info("sending: %s" % self.market_link)
- nsd = market_agent.create(entity=self.market_link)
- self.logger.info("Onboarded nsd: " + nsd.get("name"))
- except NfvoException as e:
- self.step_failure(e.message)
-
- nsr_agent = self.main_agent.get_agent("nsr",
- project_id=self.ob_projectid)
- nsd_id = nsd.get('id')
- if nsd_id is None:
- self.step_failure("NSD not onboarded correctly")
-
- nsr = None
try:
- nsr = nsr_agent.create(nsd_id)
- except NfvoException as e:
- self.step_failure(e.message)
-
- if nsr.get('code') is not None:
- self.logger.error(
- "vIMS cannot be deployed: %s -> %s" %
- (nsr.get('code'), nsr.get('message')))
- self.step_failure("vIMS cannot be deployed")
-
- i = 0
- self.logger.info("waiting NSR to go to active...")
- while nsr.get("status") != 'ACTIVE' and nsr.get("status") != 'ERROR':
- i += 1
- if i == 100:
- self.step_failure("After %s sec the nsr did not go to active.."
- % 5 * 100)
- time.sleep(5)
- nsr = json.loads(nsr_agent.find(nsr.get('id')))
-
- if nsr.get("status") == 'ACTIVE':
- deploy_vnf = {'status': "PASS", 'result': nsr}
- self.logger.info("Deploy VNF: OK")
+ openo_connect.create_service(self.openo_msb_endpoint,
+ 'functest_opera',
+ 'VNF for functest testing')
+ except Exception as e:
+ self.logger.error(e)
+ return {'status': 'FAIL', 'result': e}
else:
- deploy_vnf = {'status': "FAIL", 'result': nsr}
- self.logger.error("Deploy VNF: ERROR")
- self.step_failure("Deploy vIMS failed")
- self.ob_nsr_id = nsr.get("id")
- return deploy_vnf
+ self.logger.info('vIMS deployment is kicked off')
+ return {'status': 'PASS', 'result': ''}
- def test_vnf(self):
- # Adaptations probably needed
- # code used for cloudify_ims
- # ruby client on jumphost calling the vIMS on the SUT
- return
+ def dump_info(self, info_file, result):
+ with open(info_file, 'w') as f:
+ self.logger.debug('Save information to file: %s', info_file)
+ json.dump(result, f)
- def clean(self):
- self.main_agent.get_agent(
- "nsr",
- project_id=self.ob_projectid).delete(self.ob_nsr_id)
- time.sleep(5)
- os_utils.delete_instance(nova_client=os_utils.get_nova_client(),
- instance_id=self.ob_instance_id)
- # TODO question is the clean removing also the VM?
- # I think so since is goinf to remove the tenant...
- super(ImsVnf, self).clean()
+ def test_vnf(self):
+ vnfm_ip = openo_connect.get_vnfm_ip(self.openo_msb_endpoint)
+ self.logger.info('VNFM IP: %s', vnfm_ip)
+ vnf_status_url = 'http://{0}:5000/api/v1/model/status'.format(vnfm_ip)
+ vnf_alive = False
+ retry = 15
+
+ self.logger.info('Check the VNF status')
+ while retry > 0:
+ rq = requests.get(vnf_status_url)
+ response = rq.json()
+ vnf_alive = response['vnf_alive']
+ msg = response['msg']
+ self.logger.info(msg)
+ if vnf_alive:
+ break
+ self.logger.info('check again in one minute...')
+ retry = retry - 1
+ time.sleep(60)
+
+ if not vnf_alive:
+ raise Exception('VNF failed to start: {0}'.format(msg))
+
+ ellis_config_url = ('http://{0}:5000/api/v1/model/ellis/configure'
+ .format(vnfm_ip))
+ rq = requests.get(ellis_config_url, timeout=60)
+ if rq.json() and not rq.json()['ellis_ok']:
+ self.logger.error(rq.json()['data'])
+ raise Exception('Failed to configure Ellis')
+
+ self.logger.info('Get Clearwater deployment detail')
+ vnf_info_url = ('http://{0}:5000/api/v1/model/output'
+ .format(vnfm_ip))
+ rq = requests.get(vnf_info_url, timeout=60)
+ data = rq.json()['data']
+ self.logger.info(data)
+ bono_ip = data['bono_ip']
+ ellis_ip = data['ellis_ip']
+ dns_ip = data['dns_ip']
+ result = self.config_ellis(ellis_ip, 'signup', True)
+ self.logger.debug('Ellis Result: %s', result)
+ self.dump_info(self.ellis_file, result)
+
+ if dns_ip:
+ vims_test_result = self.run_clearwater_live_test(
+ dns_ip,
+ 'clearwater.local',
+ bono_ip,
+ ellis_ip,
+ 'signup')
+ if vims_test_result != '':
+ self.dump_info(self.live_test_file, vims_test_result)
+ return {'status': 'PASS', 'result': vims_test_result}
+ else:
+ return {'status': 'FAIL', 'result': ''}
def main(self, **kwargs):
- self.logger.info("Orchestra IMS VNF onboarding test starting")
+ self.logger.info("Start to run Opera vIMS VNF onboarding test")
self.execute()
- self.logger.info("Orchestra IMS VNF onboarding test executed")
+ self.logger.info("Opera vIMS VNF onboarding test finished")
if self.criteria is "PASS":
return self.EX_OK
else:
@@ -391,10 +127,3 @@ class ImsVnf(vnf_base.VnfOnBoardingBase):
def run(self):
kwargs = {}
return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- test = ImsVnf()
- test.deploy_orchestrator()
- test.deploy_vnf()
- test.clean()
diff --git a/functest/opnfv_tests/vnf/rnc/parser.py b/functest/opnfv_tests/vnf/rnc/parser.py
index 1cff72209..133145d74 100644
--- a/functest/opnfv_tests/vnf/rnc/parser.py
+++ b/functest/opnfv_tests/vnf/rnc/parser.py
@@ -15,10 +15,10 @@
# limitations under the License.
#
-import functest.core.feature_base as base
+import functest.core.feature as base
-class Parser(base.FeatureBase):
+class Parser(base.Feature):
def __init__(self):
super(Parser, self).__init__(project='parser',
case='parser-basics',
diff --git a/functest/opnfv_tests/vnf/router/vyos_vrouter.py b/functest/opnfv_tests/vnf/router/vyos_vrouter.py
index 6c50e8375..e188c3fbb 100644
--- a/functest/opnfv_tests/vnf/router/vyos_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/vyos_vrouter.py
@@ -6,14 +6,14 @@
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-import functest.core.feature_base as base
+import functest.core.feature as base
import json
import os
RESULT_DETAILS_FILE = "test_result.json"
-class VrouterVnf(base.FeatureBase):
+class VrouterVnf(base.Feature):
def __init__(self):
super(VrouterVnf, self).__init__(project='functest',
case='vyos_vrouter',
diff --git a/functest/tests/unit/ci/test_prepare_env.py b/functest/tests/unit/ci/test_prepare_env.py
index 540501ff0..714dd13c3 100644
--- a/functest/tests/unit/ci/test_prepare_env.py
+++ b/functest/tests/unit/ci/test_prepare_env.py
@@ -20,6 +20,9 @@ class PrepareEnvTesting(unittest.TestCase):
logging.disable(logging.CRITICAL)
+ def setUp(self):
+ self.prepare_envparser = prepare_env.PrepareEnvParser()
+
@mock.patch('functest.ci.prepare_env.logger.info')
def test_print_separator(self, mock_logger_info):
str = "=============================================="
@@ -81,7 +84,7 @@ class PrepareEnvTesting(unittest.TestCase):
@mock.patch('functest.ci.prepare_env.logger.warning')
def test_check_env_variables_with_scenario(self, mock_logger_warn,
mock_logger_info):
- CONST.DEPLOY_SCENARIO = mock.Mock()
+ CONST.DEPLOY_SCENARIO = 'test_scenario'
prepare_env.check_env_variables()
mock_logger_info.assert_any_call("Checking environment variables"
"...")
@@ -134,6 +137,47 @@ class PrepareEnvTesting(unittest.TestCase):
mock_logger_info.assert_any_call(test_utils.
SubstrMatch(" IS_CI_RUN="))
+ def test_get_deployment_handler_missing_const_vars(self):
+ with mock.patch('functest.ci.prepare_env.'
+ 'factory.Factory.get_handler') as m:
+ CONST.INSTALLER_IP = None
+ prepare_env.get_deployment_handler()
+ self.assertFalse(m.called)
+
+ CONST.INSTALLER_TYPE = None
+ prepare_env.get_deployment_handler()
+ self.assertFalse(m.called)
+
+ @mock.patch('functest.ci.prepare_env.logger.debug')
+ def test_get_deployment_handler_missing_print_deploy_info(self,
+ mock_debug):
+ with mock.patch('functest.ci.prepare_env.'
+ 'factory.Factory.get_handler') as m, \
+ mock.patch('functest.ci.prepare_env.'
+ 'ft_utils.get_parameter_from_yaml',
+ side_effect=ValueError):
+ CONST.INSTALLER_IP = 'test_ip'
+ CONST.INSTALLER_TYPE = 'test_inst_type'
+ opnfv_constants.INSTALLERS = ['test_inst_type']
+ prepare_env.get_deployment_handler()
+ msg = ('Printing deployment info is not supported for '
+ 'test_inst_type')
+ mock_debug.assert_any_call(msg)
+ self.assertFalse(m.called)
+
+ @mock.patch('functest.ci.prepare_env.logger.debug')
+ def test_get_deployment_handler_exception(self, mock_debug):
+ with mock.patch('functest.ci.prepare_env.'
+ 'factory.Factory.get_handler',
+ side_effect=Exception), \
+ mock.patch('functest.ci.prepare_env.'
+ 'ft_utils.get_parameter_from_yaml'):
+ CONST.INSTALLER_IP = 'test_ip'
+ CONST.INSTALLER_TYPE = 'test_inst_type'
+ opnfv_constants.INSTALLERS = ['test_inst_type']
+ prepare_env.get_deployment_handler()
+ self.assertTrue(mock_debug.called)
+
@mock.patch('functest.ci.prepare_env.logger.info')
@mock.patch('functest.ci.prepare_env.logger.debug')
def test_create_directories_missing_dir(self, mock_logger_debug,
@@ -228,6 +272,36 @@ class PrepareEnvTesting(unittest.TestCase):
prepare_env.source_rc_file()
+ @mock.patch('functest.ci.prepare_env.logger.debug')
+ def test_patch_file(self, mock_logger_debug):
+ with mock.patch("__builtin__.open", mock.mock_open()), \
+ mock.patch('functest.ci.prepare_env.yaml.safe_load',
+ return_value={'test_scenario': {'tkey': 'tvalue'}}), \
+ mock.patch('functest.ci.prepare_env.ft_utils.get_functest_yaml',
+ return_value={'tkey1': 'tvalue1'}), \
+ mock.patch('functest.ci.prepare_env.os.remove') as m, \
+ mock.patch('functest.ci.prepare_env.yaml.dump'):
+ CONST.DEPLOY_SCENARIO = 'test_scenario'
+ prepare_env.patch_file('test_file')
+ self.assertTrue(m.called)
+
+ @mock.patch('functest.ci.prepare_env.logger.info')
+ def test_verify_deployment_error(self, mock_logger_error):
+ mock_popen = mock.Mock()
+ attrs = {'poll.return_value': None,
+ 'stdout.readline.return_value': 'ERROR'}
+ mock_popen.configure_mock(**attrs)
+
+ with mock.patch('functest.ci.prepare_env.print_separator') as m, \
+ mock.patch('functest.ci.prepare_env.subprocess.Popen',
+ return_value=mock_popen), \
+ self.assertRaises(Exception) as context:
+ prepare_env.verify_deployment()
+ self.assertTrue(m.called)
+ msg = "Problem while running 'check_os.sh'."
+ mock_logger_error.assert_called_once_with('ERROR')
+ self.assertTrue(msg in context)
+
def _get_rally_creds(self):
return {"type": "ExistingCloud",
"admin": {"username": 'test_user_name',
@@ -271,6 +345,33 @@ class PrepareEnvTesting(unittest.TestCase):
"Rally plugins.")
mock_exec.assert_any_call(cmd, error_msg=error_msg)
+ @mock.patch('functest.ci.prepare_env.logger.debug')
+ def test_install_tempest(self, mock_logger_debug):
+ mock_popen = mock.Mock()
+ attrs = {'poll.return_value': None,
+ 'stdout.readline.return_value': '0'}
+ mock_popen.configure_mock(**attrs)
+
+ CONST.tempest_deployment_name = 'test_dep_name'
+ with mock.patch('functest.ci.prepare_env.'
+ 'ft_utils.execute_command_raise',
+ side_effect=Exception), \
+ mock.patch('functest.ci.prepare_env.subprocess.Popen',
+ return_value=mock_popen), \
+ self.assertRaises(Exception):
+ prepare_env.install_tempest()
+ mock_logger_debug.assert_any_call("Tempest test_dep_name"
+ " does not exist")
+
+ def test_create_flavor(self):
+ with mock.patch('functest.ci.prepare_env.'
+ 'os_utils.get_or_create_flavor',
+ return_value=('test_', None)), \
+ self.assertRaises(Exception) as context:
+ prepare_env.create_flavor()
+ msg = 'Failed to create flavor'
+ self.assertTrue(msg in context)
+
@mock.patch('functest.ci.prepare_env.sys.exit')
@mock.patch('functest.ci.prepare_env.logger.error')
def test_check_environment_missing_file(self, mock_logger_error,
@@ -299,6 +400,7 @@ class PrepareEnvTesting(unittest.TestCase):
mock_logger_info.assert_any_call("Functest environment"
" is installed.")
+ @mock.patch('functest.ci.prepare_env.print_deployment_info')
@mock.patch('functest.ci.prepare_env.check_environment')
@mock.patch('functest.ci.prepare_env.create_flavor')
@mock.patch('functest.ci.prepare_env.install_tempest')
@@ -307,19 +409,21 @@ class PrepareEnvTesting(unittest.TestCase):
@mock.patch('functest.ci.prepare_env.patch_config_file')
@mock.patch('functest.ci.prepare_env.source_rc_file')
@mock.patch('functest.ci.prepare_env.create_directories')
+ @mock.patch('functest.ci.prepare_env.get_deployment_handler')
@mock.patch('functest.ci.prepare_env.check_env_variables')
@mock.patch('functest.ci.prepare_env.logger.info')
- def test_main_start(self, mock_logger_info, mock_env_var,
+ def test_main_start(self, mock_logger_info, mock_env_var, mock_dep_handler,
mock_create_dir, mock_source_rc, mock_patch_config,
mock_verify_depl, mock_install_rally,
mock_install_temp, mock_create_flavor,
- mock_check_env):
+ mock_check_env, mock_print_info):
with mock.patch("__builtin__.open", mock.mock_open()) as m:
args = {'action': 'start'}
self.assertEqual(prepare_env.main(**args), 0)
mock_logger_info.assert_any_call("######### Preparing Functest "
"environment #########\n")
self.assertTrue(mock_env_var.called)
+ self.assertTrue(mock_dep_handler.called)
self.assertTrue(mock_create_dir.called)
self.assertTrue(mock_source_rc.called)
self.assertTrue(mock_patch_config.called)
@@ -329,6 +433,7 @@ class PrepareEnvTesting(unittest.TestCase):
self.assertTrue(mock_create_flavor.called)
m.assert_called_once_with(CONST.env_active, "w")
self.assertTrue(mock_check_env.called)
+ self.assertTrue(mock_print_info.called)
@mock.patch('functest.ci.prepare_env.check_environment')
def test_main_check(self, mock_check_env):
diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py
index 021406109..7d02b1af1 100644
--- a/functest/tests/unit/ci/test_run_tests.py
+++ b/functest/tests/unit/ci/test_run_tests.py
@@ -37,6 +37,9 @@ class RunTestsTesting(unittest.TestCase):
attrs = {'get_tiers.return_value': [self.tier]}
self.tiers.configure_mock(**attrs)
+ self.run_tests_parser = run_tests.RunTestsParser()
+ self.global_variables = run_tests.GlobalVariables()
+
@mock.patch('functest.ci.run_tests.logger.info')
def test_print_separator(self, mock_logger_info):
run_tests.print_separator(self.sep)
@@ -121,8 +124,8 @@ class RunTestsTesting(unittest.TestCase):
def test_run_tests_import_test_class_exception(self):
mock_test = mock.Mock()
- args = {'get_name': 'test_name',
- 'needs_clean': False}
+ args = {'get_name.return_value': 'test_name',
+ 'needs_clean.return_value': False}
mock_test.configure_mock(**args)
with mock.patch('functest.ci.run_tests.print_separator'),\
mock.patch('functest.ci.run_tests.source_rc_file'), \
@@ -133,6 +136,28 @@ class RunTestsTesting(unittest.TestCase):
msg = "Cannot import the class for the test case."
self.assertTrue(msg in context)
+ def test_run_tests_default(self):
+ mock_test = mock.Mock()
+ args = {'get_name.return_value': 'test_name',
+ 'needs_clean.return_value': True}
+ mock_test.configure_mock(**args)
+ test_run_dict = {'module': 'test_module',
+ 'class': mock.Mock,
+ 'args': 'test_args'}
+ with mock.patch('functest.ci.run_tests.print_separator'),\
+ mock.patch('functest.ci.run_tests.source_rc_file'), \
+ mock.patch('functest.ci.run_tests.generate_os_snapshot'), \
+ mock.patch('functest.ci.run_tests.cleanup'), \
+ mock.patch('functest.ci.run_tests.update_test_info'), \
+ mock.patch('functest.ci.run_tests.get_run_dict',
+ return_value=test_run_dict), \
+ mock.patch('functest.ci.run_tests.generate_report.main'), \
+ self.assertRaises(run_tests.BlockingTestFailed) as context:
+ run_tests.GlobalVariables.CLEAN_FLAG = True
+ run_tests.run_test(mock_test, 'tier_name')
+ msg = 'The test case test_name failed and is blocking'
+ self.assertTrue(msg in context)
+
@mock.patch('functest.ci.run_tests.logger.info')
def test_run_tier_default(self, mock_logger_info):
with mock.patch('functest.ci.run_tests.print_separator'), \
@@ -187,6 +212,61 @@ class RunTestsTesting(unittest.TestCase):
self.assertEqual(run_tests.main(**kwargs),
run_tests.Result.EX_ERROR)
+ def test_main_default(self):
+ kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
+ mock_obj = mock.Mock()
+ args = {'get_tier.return_value': True,
+ 'get_test.return_value': False}
+ mock_obj.configure_mock(**args)
+ with mock.patch('functest.ci.run_tests.tb.TierBuilder',
+ return_value=mock_obj), \
+ mock.patch('functest.ci.run_tests.source_rc_file'), \
+ mock.patch('functest.ci.run_tests.generate_report.init'), \
+ mock.patch('functest.ci.run_tests.run_tier') as m:
+ self.assertEqual(run_tests.main(**kwargs),
+ run_tests.Result.EX_OK)
+ self.assertTrue(m.called)
+
+ mock_obj = mock.Mock()
+ args = {'get_tier.return_value': False,
+ 'get_test.return_value': True}
+ mock_obj.configure_mock(**args)
+ with mock.patch('functest.ci.run_tests.tb.TierBuilder',
+ return_value=mock_obj), \
+ mock.patch('functest.ci.run_tests.source_rc_file'), \
+ mock.patch('functest.ci.run_tests.generate_report.init'), \
+ mock.patch('functest.ci.run_tests.run_test') as m:
+ self.assertEqual(run_tests.main(**kwargs),
+ run_tests.Result.EX_OK)
+ self.assertTrue(m.called)
+
+ kwargs = {'test': 'all', 'noclean': True, 'report': True}
+ mock_obj = mock.Mock()
+ args = {'get_tier.return_value': False,
+ 'get_test.return_value': False}
+ mock_obj.configure_mock(**args)
+ with mock.patch('functest.ci.run_tests.tb.TierBuilder',
+ return_value=mock_obj), \
+ mock.patch('functest.ci.run_tests.source_rc_file'), \
+ mock.patch('functest.ci.run_tests.generate_report.init'), \
+ mock.patch('functest.ci.run_tests.run_all') as m:
+ self.assertEqual(run_tests.main(**kwargs),
+ run_tests.Result.EX_OK)
+ self.assertTrue(m.called)
+
+ kwargs = {'test': 'any', 'noclean': True, 'report': True}
+ mock_obj = mock.Mock()
+ args = {'get_tier.return_value': False,
+ 'get_test.return_value': False}
+ mock_obj.configure_mock(**args)
+ with mock.patch('functest.ci.run_tests.tb.TierBuilder',
+ return_value=mock_obj), \
+ mock.patch('functest.ci.run_tests.source_rc_file'), \
+ mock.patch('functest.ci.run_tests.generate_report.init'), \
+ mock.patch('functest.ci.run_tests.logger.debug') as m:
+ self.assertEqual(run_tests.main(**kwargs),
+ run_tests.Result.EX_ERROR)
+ self.assertTrue(m.called)
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py
index 01d99d7e7..21df4098b 100644
--- a/functest/tests/unit/ci/test_tier_handler.py
+++ b/functest/tests/unit/ci/test_tier_handler.py
@@ -41,6 +41,15 @@ class TierHandlerTesting(unittest.TestCase):
self.dependency = tier_handler.Dependency('test_installer',
'test_scenario')
+ self.testcase.str = self.testcase.__str__()
+ self.dependency.str = self.dependency.__str__()
+ self.tier.str = self.tier.__str__()
+
+ def test_split_text(self):
+ test_str = 'this is for testing'
+ self.assertEqual(tier_handler.split_text(test_str, 10),
+ ['this is ', 'for ', 'testing '])
+
def test_add_test(self):
self.tier.add_test(self.test)
self.assertEqual(self.tier.tests_array,
diff --git a/functest/tests/unit/core/test_testcase_base.py b/functest/tests/unit/core/test_testcase.py
index 9ee1609fc..32104194b 100644
--- a/functest/tests/unit/core/test_testcase_base.py
+++ b/functest/tests/unit/core/test_testcase.py
@@ -7,6 +7,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+"""Define the classe required to fully cover testcase."""
+
import logging
import unittest
@@ -14,9 +16,14 @@ import mock
from functest.core import testcase
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
class TestCaseTesting(unittest.TestCase):
+ """The class testing TestCase."""
+ # pylint: disable=missing-docstring
+
logging.disable(logging.CRITICAL)
def setUp(self):
diff --git a/functest/tests/unit/odl/test_odl.py b/functest/tests/unit/odl/test_odl.py
index 6967eb1a8..e08deb27d 100644
--- a/functest/tests/unit/odl/test_odl.py
+++ b/functest/tests/unit/odl/test_odl.py
@@ -7,14 +7,16 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
+"""Define the classes required to fully cover odl."""
+
import errno
import logging
-import mock
import os
import StringIO
import unittest
from keystoneauth1.exceptions import auth_plugins
+import mock
from robot.errors import DataError, RobotError
from robot.result import testcase as result_testcase
from robot.utils.robottime import timestamp_to_secs
@@ -22,9 +24,50 @@ from robot.utils.robottime import timestamp_to_secs
from functest.core import testcase
from functest.opnfv_tests.sdn.odl import odl
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+
+class ODLVisitorTesting(unittest.TestCase):
+
+ """The class testing ODLResultVisitor."""
+ # pylint: disable=missing-docstring
+
+ logging.disable(logging.CRITICAL)
+
+ def setUp(self):
+ self.visitor = odl.ODLResultVisitor()
+
+ def test_empty(self):
+ self.assertFalse(self.visitor.get_data())
+
+ def test_ok(self):
+ data = {'name': 'foo',
+ 'parent': 'bar',
+ 'status': 'PASS',
+ 'starttime': "20161216 16:00:00.000",
+ 'endtime': "20161216 16:00:01.000",
+ 'elapsedtime': 1000,
+ 'text': 'Hello, World!',
+ 'critical': True}
+ test = result_testcase.TestCase(name=data['name'],
+ status=data['status'],
+ message=data['text'],
+ starttime=data['starttime'],
+ endtime=data['endtime'])
+ test.parent = mock.Mock()
+ config = {'name': data['parent'],
+ 'criticality.test_is_critical.return_value': data[
+ 'critical']}
+ test.parent.configure_mock(**config)
+ self.visitor.visit_test(test)
+ self.assertEqual(self.visitor.get_data(), [data])
+
class ODLTesting(unittest.TestCase):
+ """The super class which testing classes could inherit."""
+ # pylint: disable=missing-docstring
+
logging.disable(logging.CRITICAL)
_keystone_ip = "127.0.0.1"
@@ -60,39 +103,19 @@ class ODLTesting(unittest.TestCase):
'odlrestconfport': self._odl_restconfport,
'pushtodb': False}
- def test_empty_visitor(self):
- visitor = odl.ODLResultVisitor()
- self.assertFalse(visitor.get_data())
- def test_visitor(self):
- visitor = odl.ODLResultVisitor()
- data = {'name': 'foo',
- 'parent': 'bar',
- 'status': 'PASS',
- 'starttime': "20161216 16:00:00.000",
- 'endtime': "20161216 16:00:01.000",
- 'elapsedtime': 1000,
- 'text': 'Hello, World!',
- 'critical': True}
- test = result_testcase.TestCase(name=data['name'],
- status=data['status'],
- message=data['text'],
- starttime=data['starttime'],
- endtime=data['endtime'])
- test.parent = mock.Mock()
- config = {'name': data['parent'],
- 'criticality.test_is_critical.return_value': data[
- 'critical']}
- test.parent.configure_mock(**config)
- visitor.visit_test(test)
- self.assertEqual(visitor.get_data(), [data])
+class ODLParseResultTesting(ODLTesting):
+
+ """The class testing ODLTests.parse_results()."""
+ # pylint: disable=missing-docstring
@mock.patch('robot.api.ExecutionResult', side_effect=DataError)
- def test_parse_results_raises_exceptions(self, *args):
+ def test_raises_exc(self, mock_method):
with self.assertRaises(DataError):
self.test.parse_results()
+ mock_method.assert_called_once_with()
- def test_parse_results(self, *args):
+ def test_ok(self):
config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
'endtime': '20161216 16:00:01.000', 'status': 'PASS'}
suite = mock.Mock()
@@ -108,16 +131,28 @@ class ODLTesting(unittest.TestCase):
self.assertEqual(self.test.details,
{'description': config['name'], 'tests': []})
+
+class ODLRobotTesting(ODLTesting):
+
+ """The class testing ODLTests.set_robotframework_vars()."""
+ # pylint: disable=missing-docstring
+
@mock.patch('fileinput.input', side_effect=Exception())
- def test_set_robotframework_vars_failed(self, *args):
+ def test_set_vars_ko(self, mock_method):
self.assertFalse(self.test.set_robotframework_vars())
+ mock_method.assert_called_once_with(
+ os.path.join(odl.ODLTests.odl_test_repo,
+ 'csit/variables/Variables.py'), inplace=True)
@mock.patch('fileinput.input', return_value=[])
- def test_set_robotframework_vars_empty(self, args):
+ def test_set_vars_empty(self, mock_method):
self.assertTrue(self.test.set_robotframework_vars())
+ mock_method.assert_called_once_with(
+ os.path.join(odl.ODLTests.odl_test_repo,
+ 'csit/variables/Variables.py'), inplace=True)
@mock.patch('sys.stdout', new_callable=StringIO.StringIO)
- def _test_set_robotframework_vars(self, msg1, msg2, *args):
+ def _test_set_vars(self, msg1, msg2, *args):
line = mock.MagicMock()
line.__iter__.return_value = [msg1]
with mock.patch('fileinput.input', return_value=line) as mock_method:
@@ -127,15 +162,15 @@ class ODLTesting(unittest.TestCase):
'csit/variables/Variables.py'), inplace=True)
self.assertEqual(args[0].getvalue(), "{}\n".format(msg2))
- def test_set_robotframework_vars_auth_default(self):
- self._test_set_robotframework_vars("AUTH = []",
- "AUTH = [u'admin', u'admin']")
+ def test_set_vars_auth_default(self):
+ self._test_set_vars("AUTH = []",
+ "AUTH = [u'admin', u'admin']")
- def test_set_robotframework_vars_auth1(self):
- self._test_set_robotframework_vars("AUTH1 = []", "AUTH1 = []")
+ def test_set_vars_auth1(self):
+ self._test_set_vars("AUTH1 = []", "AUTH1 = []")
@mock.patch('sys.stdout', new_callable=StringIO.StringIO)
- def test_set_robotframework_vars_auth_foo(self, *args):
+ def test_set_vars_auth_foo(self, *args):
line = mock.MagicMock()
line.__iter__.return_value = ["AUTH = []"]
with mock.patch('fileinput.input', return_value=line) as mock_method:
@@ -146,15 +181,11 @@ class ODLTesting(unittest.TestCase):
self.assertEqual(args[0].getvalue(),
"AUTH = [u'{}', u'{}']\n".format('foo', 'bar'))
- @classmethod
- def _fake_url_for(cls, service_type='identity', **kwargs):
- if service_type == 'identity':
- return "http://{}:5000/v2.0".format(
- ODLTesting._keystone_ip)
- elif service_type == 'network':
- return "http://{}:9696".format(ODLTesting._neutron_ip)
- else:
- return None
+
+class ODLMainTesting(ODLTesting):
+
+ """The class testing ODLTests.main()."""
+ # pylint: disable=missing-docstring
def _get_main_kwargs(self, key=None):
kwargs = {'odlusername': self._odl_username,
@@ -199,50 +230,50 @@ class ODLTesting(unittest.TestCase):
args[2].assert_called_with(
os.path.join(odl.ODLTests.res_dir, 'stdout.txt'))
- def _test_main_missing_keyword(self, key):
+ def _test_no_keyword(self, key):
kwargs = self._get_main_kwargs(key)
self.assertEqual(self.test.main(**kwargs),
testcase.TestCase.EX_RUN_ERROR)
- def test_main_missing_odlusername(self):
- self._test_main_missing_keyword('odlusername')
+ def test_no_odlusername(self):
+ self._test_no_keyword('odlusername')
- def test_main_missing_odlpassword(self):
- self._test_main_missing_keyword('odlpassword')
+ def test_no_odlpassword(self):
+ self._test_no_keyword('odlpassword')
- def test_main_missing_neutronip(self):
- self._test_main_missing_keyword('neutronip')
+ def test_no_neutronip(self):
+ self._test_no_keyword('neutronip')
- def test_main_missing_osauthurl(self):
- self._test_main_missing_keyword('osauthurl')
+ def test_no_osauthurl(self):
+ self._test_no_keyword('osauthurl')
- def test_main_missing_osusername(self):
- self._test_main_missing_keyword('osusername')
+ def test_no_osusername(self):
+ self._test_no_keyword('osusername')
- def test_main_missing_ostenantname(self):
- self._test_main_missing_keyword('ostenantname')
+ def test_no_ostenantname(self):
+ self._test_no_keyword('ostenantname')
- def test_main_missing_ospassword(self):
- self._test_main_missing_keyword('ospassword')
+ def test_no_ospassword(self):
+ self._test_no_keyword('ospassword')
- def test_main_missing_odlip(self):
- self._test_main_missing_keyword('odlip')
+ def test_no_odlip(self):
+ self._test_no_keyword('odlip')
- def test_main_missing_odlwebport(self):
- self._test_main_missing_keyword('odlwebport')
+ def test_no_odlwebport(self):
+ self._test_no_keyword('odlwebport')
- def test_main_missing_odlrestconfport(self):
- self._test_main_missing_keyword('odlrestconfport')
+ def test_no_odlrestconfport(self):
+ self._test_no_keyword('odlrestconfport')
- def test_main_set_robotframework_vars_failed(self):
+ def test_set_vars_ko(self):
with mock.patch.object(self.test, 'set_robotframework_vars',
- return_value=False):
+ return_value=False) as mock_object:
self._test_main(testcase.TestCase.EX_RUN_ERROR)
- self.test.set_robotframework_vars.assert_called_once_with(
+ mock_object.assert_called_once_with(
self._odl_username, self._odl_password)
@mock.patch('os.makedirs', side_effect=Exception)
- def test_main_makedirs_exception(self, mock_method):
+ def test_makedirs_exc(self, mock_method):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
self.assertRaises(Exception):
@@ -250,7 +281,7 @@ class ODLTesting(unittest.TestCase):
mock_method)
@mock.patch('os.makedirs', side_effect=OSError)
- def test_main_makedirs_oserror(self, mock_method):
+ def test_makedirs_oserror(self, mock_method):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True):
self._test_main(testcase.TestCase.EX_RUN_ERROR,
@@ -258,7 +289,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('robot.run', side_effect=RobotError)
@mock.patch('os.makedirs')
- def test_main_robot_run_failed(self, *args):
+ def test_run_ko(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -268,7 +299,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('robot.run')
@mock.patch('os.makedirs')
- def test_main_parse_results_failed(self, *args):
+ def test_parse_results_ko(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -280,7 +311,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('os.remove', side_effect=Exception)
@mock.patch('robot.run')
@mock.patch('os.makedirs')
- def test_main_remove_exception(self, *args):
+ def test_remove_exc(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(self.test, 'parse_results'), \
@@ -290,7 +321,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('os.remove')
@mock.patch('robot.run')
@mock.patch('os.makedirs')
- def test_main(self, *args):
+ def test_ok(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -301,7 +332,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('os.remove')
@mock.patch('robot.run')
@mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
- def test_main_makedirs_oserror17(self, *args):
+ def test_makedirs_oserror17(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -312,7 +343,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('os.remove')
@mock.patch('robot.run', return_value=1)
@mock.patch('os.makedirs')
- def test_main_testcases_in_failure(self, *args):
+ def test_testcases_in_failure(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -323,7 +354,7 @@ class ODLTesting(unittest.TestCase):
@mock.patch('os.remove', side_effect=OSError)
@mock.patch('robot.run')
@mock.patch('os.makedirs')
- def test_main_remove_oserror(self, *args):
+ def test_remove_oserror(self, *args):
with mock.patch.object(self.test, 'set_robotframework_vars',
return_value=True), \
mock.patch.object(odl, 'open', mock.mock_open(),
@@ -331,7 +362,23 @@ class ODLTesting(unittest.TestCase):
mock.patch.object(self.test, 'parse_results'):
self._test_main(testcase.TestCase.EX_OK, *args)
- def _test_run_missing_env_var(self, var):
+
+class ODLRunTesting(ODLTesting):
+
+ """The class testing ODLTests.run()."""
+ # pylint: disable=missing-docstring
+
+ @classmethod
+ def _fake_url_for(cls, service_type='identity'):
+ if service_type == 'identity':
+ return "http://{}:5000/v2.0".format(
+ ODLTesting._keystone_ip)
+ elif service_type == 'network':
+ return "http://{}:9696".format(ODLTesting._neutron_ip)
+ else:
+ return None
+
+ def _test_no_env_var(self, var):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
del os.environ[var]
@@ -339,8 +386,12 @@ class ODLTesting(unittest.TestCase):
testcase.TestCase.EX_RUN_ERROR)
def _test_run(self, status=testcase.TestCase.EX_OK,
- exception=None, odlip="127.0.0.3", odlwebport="8080",
- odlrestconfport="8181"):
+ exception=None, **kwargs):
+ odlip = kwargs['odlip'] if 'odlip' in kwargs else '127.0.0.3'
+ odlwebport = kwargs['odlwebport'] if 'odlwebport' in kwargs else '8080'
+ odlrestconfport = (kwargs['odlrestconfport']
+ if 'odlrestconfport' in kwargs else '8181')
+
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
if exception:
@@ -358,11 +409,13 @@ class ODLTesting(unittest.TestCase):
ospassword=self._os_password, ostenantname=self._os_tenantname,
osusername=self._os_username)
- def _test_run_defining_multiple_suites(
- self, suites,
- status=testcase.TestCase.EX_OK,
- exception=None, odlip="127.0.0.3", odlwebport="8080",
- odlrestconfport="8181"):
+ def _test_multiple_suites(self, suites,
+ status=testcase.TestCase.EX_OK,
+ exception=None, **kwargs):
+ odlip = kwargs['odlip'] if 'odlip' in kwargs else '127.0.0.3'
+ odlwebport = kwargs['odlwebport'] if 'odlwebport' in kwargs else '8080'
+ odlrestconfport = (kwargs['odlrestconfport']
+ if 'odlrestconfport' in kwargs else '8181')
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
if exception:
@@ -380,31 +433,31 @@ class ODLTesting(unittest.TestCase):
ospassword=self._os_password, ostenantname=self._os_tenantname,
osusername=self._os_username)
- def test_run_exception(self):
+ def test_exc(self):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=auth_plugins.MissingAuthPlugin()):
self.assertEqual(self.test.run(),
testcase.TestCase.EX_RUN_ERROR)
- def test_run_missing_os_auth_url(self):
- self._test_run_missing_env_var("OS_AUTH_URL")
+ def test_no_os_auth_url(self):
+ self._test_no_env_var("OS_AUTH_URL")
- def test_run_missing_os_username(self):
- self._test_run_missing_env_var("OS_USERNAME")
+ def test_no_os_username(self):
+ self._test_no_env_var("OS_USERNAME")
- def test_run_missing_os_password(self):
- self._test_run_missing_env_var("OS_PASSWORD")
+ def test_no_os_password(self):
+ self._test_no_env_var("OS_PASSWORD")
- def test_run_missing_os_tenant_name(self):
- self._test_run_missing_env_var("OS_TENANT_NAME")
+ def test_no_os_tenant_name(self):
+ self._test_no_env_var("OS_TENANT_NAME")
- def test_run_main_false(self):
+ def test_main_false(self):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
self._test_run(testcase.TestCase.EX_RUN_ERROR,
odlip=self._sdn_controller_ip,
odlwebport=self._odl_webport)
- def test_run_main_exception(self):
+ def test_main_exc(self):
with self.assertRaises(Exception):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
self._test_run(status=testcase.TestCase.EX_RUN_ERROR,
@@ -412,147 +465,155 @@ class ODLTesting(unittest.TestCase):
odlip=self._sdn_controller_ip,
odlwebport=self._odl_webport)
- def test_run_missing_sdn_controller_ip(self):
+ def test_no_sdn_controller_ip(self):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
self.assertEqual(self.test.run(),
testcase.TestCase.EX_RUN_ERROR)
- def test_run_without_installer_type(self):
+ def test_without_installer_type(self):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
self._test_run(testcase.TestCase.EX_OK,
odlip=self._sdn_controller_ip,
odlwebport=self._odl_webport)
- def test_run_redefining_suites(self):
+ def test_suites(self):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
- self._test_run_defining_multiple_suites(
+ self._test_multiple_suites(
[odl.ODLTests.basic_suite_dir],
testcase.TestCase.EX_OK,
odlip=self._sdn_controller_ip,
odlwebport=self._odl_webport)
- def test_run_fuel(self):
+ def test_fuel(self):
os.environ["INSTALLER_TYPE"] = "fuel"
self._test_run(testcase.TestCase.EX_OK,
odlip=self._neutron_ip, odlwebport='8282')
- def test_run_apex_missing_sdn_controller_ip(self):
+ def test_apex_no_controller_ip(self):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
os.environ["INSTALLER_TYPE"] = "apex"
self.assertEqual(self.test.run(),
testcase.TestCase.EX_RUN_ERROR)
- def test_run_apex(self):
+ def test_apex(self):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
os.environ["INSTALLER_TYPE"] = "apex"
self._test_run(testcase.TestCase.EX_OK,
odlip=self._sdn_controller_ip, odlwebport='8081',
odlrestconfport='8081')
- def test_run_netvirt_missing_sdn_controller_ip(self):
+ def test_netvirt_no_controller_ip(self):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
os.environ["INSTALLER_TYPE"] = "netvirt"
self.assertEqual(self.test.run(),
testcase.TestCase.EX_RUN_ERROR)
- def test_run_netvirt(self):
+ def test_netvirt(self):
os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
os.environ["INSTALLER_TYPE"] = "netvirt"
self._test_run(testcase.TestCase.EX_OK,
odlip=self._sdn_controller_ip, odlwebport='8081',
odlrestconfport='8081')
- def test_run_joid_missing_sdn_controller(self):
+ def test_joid_no_controller_ip(self):
with mock.patch('functest.utils.openstack_utils.get_endpoint',
side_effect=self._fake_url_for):
os.environ["INSTALLER_TYPE"] = "joid"
self.assertEqual(self.test.run(),
testcase.TestCase.EX_RUN_ERROR)
- def test_run_joid(self):
+ def test_joid(self):
os.environ["SDN_CONTROLLER"] = self._sdn_controller_ip
os.environ["INSTALLER_TYPE"] = "joid"
self._test_run(testcase.TestCase.EX_OK,
odlip=self._sdn_controller_ip, odlwebport='8080')
- def test_run_compass(self, *args):
+ def test_compass(self):
os.environ["INSTALLER_TYPE"] = "compass"
self._test_run(testcase.TestCase.EX_OK,
odlip=self._neutron_ip, odlwebport='8181')
- def test_argparser_default(self):
- parser = odl.ODLParser()
- self.assertEqual(parser.parse_args(), self.defaultargs)
- def test_argparser_basic(self):
+class ODLArgParserTesting(ODLTesting):
+
+ """The class testing ODLParser."""
+ # pylint: disable=missing-docstring
+
+ def setUp(self):
+ self.parser = odl.ODLParser()
+ super(ODLArgParserTesting, self).setUp()
+
+ def test_default(self):
+ self.assertEqual(self.parser.parse_args(), self.defaultargs)
+
+ def test_basic(self):
self.defaultargs['neutronip'] = self._neutron_ip
self.defaultargs['odlip'] = self._sdn_controller_ip
- parser = odl.ODLParser()
- self.assertEqual(parser.parse_args(
- ["--neutronip={}".format(self._neutron_ip),
- "--odlip={}".format(self._sdn_controller_ip)
- ]), self.defaultargs)
+ self.assertEqual(
+ self.parser.parse_args(
+ ["--neutronip={}".format(self._neutron_ip),
+ "--odlip={}".format(self._sdn_controller_ip)]),
+ self.defaultargs)
@mock.patch('sys.stderr', new_callable=StringIO.StringIO)
- def test_argparser_fail(self, *args):
+ def test_fail(self, mock_method):
self.defaultargs['foo'] = 'bar'
- parser = odl.ODLParser()
with self.assertRaises(SystemExit):
- parser.parse_args(["--foo=bar"])
+ self.parser.parse_args(["--foo=bar"])
+ mock_method.assert_called_once_with()
- def _test_argparser(self, arg, value):
+ def _test_arg(self, arg, value):
self.defaultargs[arg] = value
- parser = odl.ODLParser()
- self.assertEqual(parser.parse_args(["--{}={}".format(arg, value)]),
- self.defaultargs)
+ self.assertEqual(
+ self.parser.parse_args(["--{}={}".format(arg, value)]),
+ self.defaultargs)
- def test_argparser_odlusername(self):
- self._test_argparser('odlusername', 'foo')
+ def test_odlusername(self):
+ self._test_arg('odlusername', 'foo')
- def test_argparser_odlpassword(self):
- self._test_argparser('odlpassword', 'foo')
+ def test_odlpassword(self):
+ self._test_arg('odlpassword', 'foo')
- def test_argparser_osauthurl(self):
- self._test_argparser('osauthurl', 'http://127.0.0.4:5000/v2')
+ def test_osauthurl(self):
+ self._test_arg('osauthurl', 'http://127.0.0.4:5000/v2')
- def test_argparser_neutronip(self):
- self._test_argparser('neutronip', '127.0.0.4')
+ def test_neutronip(self):
+ self._test_arg('neutronip', '127.0.0.4')
- def test_argparser_osusername(self):
- self._test_argparser('osusername', 'foo')
+ def test_osusername(self):
+ self._test_arg('osusername', 'foo')
- def test_argparser_ostenantname(self):
- self._test_argparser('ostenantname', 'foo')
+ def test_ostenantname(self):
+ self._test_arg('ostenantname', 'foo')
- def test_argparser_ospassword(self):
- self._test_argparser('ospassword', 'foo')
+ def test_ospassword(self):
+ self._test_arg('ospassword', 'foo')
- def test_argparser_odlip(self):
- self._test_argparser('odlip', '127.0.0.4')
+ def test_odlip(self):
+ self._test_arg('odlip', '127.0.0.4')
- def test_argparser_odlwebport(self):
- self._test_argparser('odlwebport', '80')
+ def test_odlwebport(self):
+ self._test_arg('odlwebport', '80')
- def test_argparser_odlrestconfport(self):
- self._test_argparser('odlrestconfport', '80')
+ def test_odlrestconfport(self):
+ self._test_arg('odlrestconfport', '80')
- def test_argparser_pushtodb(self):
+ def test_pushtodb(self):
self.defaultargs['pushtodb'] = True
- parser = odl.ODLParser()
- self.assertEqual(parser.parse_args(["--{}".format('pushtodb')]),
+ self.assertEqual(self.parser.parse_args(["--{}".format('pushtodb')]),
self.defaultargs)
- def test_argparser_multiple_args(self):
+ def test_multiple_args(self):
self.defaultargs['neutronip'] = self._neutron_ip
self.defaultargs['odlip'] = self._sdn_controller_ip
- parser = odl.ODLParser()
- self.assertEqual(parser.parse_args(
- ["--neutronip={}".format(self._neutron_ip),
- "--odlip={}".format(self._sdn_controller_ip)
- ]), self.defaultargs)
+ self.assertEqual(
+ self.parser.parse_args(
+ ["--neutronip={}".format(self._neutron_ip),
+ "--odlip={}".format(self._sdn_controller_ip)]),
+ self.defaultargs)
if __name__ == "__main__":
diff --git a/functest/tests/unit/opnfv_tests/openstack/tempest/test_conf_utils.py b/functest/tests/unit/opnfv_tests/openstack/tempest/test_conf_utils.py
index caf21925a..8ca5cc5b6 100644
--- a/functest/tests/unit/opnfv_tests/openstack/tempest/test_conf_utils.py
+++ b/functest/tests/unit/opnfv_tests/openstack/tempest/test_conf_utils.py
@@ -37,7 +37,6 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
self.assertTrue(msg in context)
def test_create_tempest_resources_missing_image(self):
- CONST.tempest_use_custom_images = 'test_image'
with mock.patch('functest.opnfv_tests.openstack.tempest.conf_utils.'
'os_utils.get_keystone_client',
return_value=mock.Mock()), \
@@ -54,13 +53,18 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
'os_utils.get_or_create_image',
return_value=(mock.Mock(), None)), \
self.assertRaises(Exception) as context:
+
+ CONST.tempest_use_custom_images = True
conf_utils.create_tempest_resources()
msg = 'Failed to create image'
self.assertTrue(msg in context)
+ CONST.tempest_use_custom_images = False
+ conf_utils.create_tempest_resources(use_custom_images=True)
+ msg = 'Failed to create image'
+ self.assertTrue(msg in context)
+
def test_create_tempest_resources_missing_flavor(self):
- CONST.tempest_use_custom_images = 'test_image'
- CONST.tempest_use_custom_flavors = 'test_flavour'
with mock.patch('functest.opnfv_tests.openstack.tempest.conf_utils.'
'os_utils.get_keystone_client',
return_value=mock.Mock()), \
@@ -80,10 +84,18 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
'os_utils.get_or_create_flavor',
return_value=(mock.Mock(), None)), \
self.assertRaises(Exception) as context:
+ CONST.tempest_use_custom_images = True
+ CONST.tempest_use_custom_flavors = True
conf_utils.create_tempest_resources()
msg = 'Failed to create flavor'
self.assertTrue(msg in context)
+ CONST.tempest_use_custom_images = True
+ CONST.tempest_use_custom_flavors = False
+ conf_utils.create_tempest_resources(use_custom_flavors=False)
+ msg = 'Failed to create flavor'
+ self.assertTrue(msg in context)
+
def test_get_verifier_id_missing_verifier(self):
CONST.tempest_deployment_name = 'test_deploy_name'
with mock.patch('functest.opnfv_tests.openstack.tempest.'
@@ -153,6 +165,210 @@ class OSTempestConfUtilsTesting(unittest.TestCase):
self.assertTrue(m1.called)
self.assertTrue(m2.called)
+ def test_get_repo_tag_default(self):
+ mock_popen = mock.Mock()
+ attrs = {'stdout.readline.return_value': 'test_tag'}
+ mock_popen.configure_mock(**attrs)
+
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.subprocess.Popen',
+ return_value=mock_popen):
+ self.assertEqual(conf_utils.get_repo_tag('test_repo'),
+ 'test_tag')
+
+ def test_backup_tempest_config_default(self):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os.path.exists',
+ return_value=False), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os.makedirs') as m1, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.shutil.copyfile') as m2:
+ conf_utils.backup_tempest_config('test_conf_file')
+ self.assertTrue(m1.called)
+ self.assertTrue(m2.called)
+
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os.path.exists',
+ return_value=True), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.shutil.copyfile') as m2:
+ conf_utils.backup_tempest_config('test_conf_file')
+ self.assertTrue(m2.called)
+
+ def test_configure_tempest_default(self):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_verifier',
+ return_value='test_conf_file'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_tempest_update_params') as m1, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_tempest_multisite_params') as m2:
+ conf_utils.configure_tempest('test_dep_dir',
+ MODE='feature_multisite')
+ self.assertTrue(m1.called)
+ self.assertTrue(m2.called)
+
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_verifier',
+ return_value='test_conf_file'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_tempest_update_params') as m1:
+ conf_utils.configure_tempest('test_dep_dir')
+ self.assertTrue(m1.called)
+ self.assertTrue(m2.called)
+
+ def test_configure_tempest_defcore_default(self):
+ img_flavor_dict = {'image_id': 'test_image_id',
+ 'flavor_id': 'test_flavor_id',
+ 'image_id_alt': 'test_image_alt_id',
+ 'flavor_id_alt': 'test_flavor_alt_id'}
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_verifier',
+ return_value='test_conf_file'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.configure_tempest_update_params'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'set') as mset, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'read') as mread, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'write') as mwrite, \
+ mock.patch('__builtin__.open', mock.mock_open()), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.shutil.copyfile'):
+ CONST.dir_functest_test = 'test_dir'
+ CONST.refstack_tempest_conf_path = 'test_path'
+ conf_utils.configure_tempest_defcore('test_dep_dir',
+ img_flavor_dict)
+ mset.assert_any_call('compute', 'image_ref', 'test_image_id')
+ mset.assert_any_call('compute', 'image_ref_alt',
+ 'test_image_alt_id')
+ mset.assert_any_call('compute', 'flavor_ref', 'test_flavor_id')
+ mset.assert_any_call('compute', 'flavor_ref_alt',
+ 'test_flavor_alt_id')
+ self.assertTrue(mread.called)
+ self.assertTrue(mwrite.called)
+
+ def _test_missing_param(self, params, image_id, flavor_id):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'set') as mset, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'read') as mread, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'write') as mwrite, \
+ mock.patch('__builtin__.open', mock.mock_open()), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.backup_tempest_config'):
+ CONST.dir_functest_test = 'test_dir'
+ CONST.OS_ENDPOINT_TYPE = None
+ conf_utils.\
+ configure_tempest_update_params('test_conf_file',
+ IMAGE_ID=image_id,
+ FLAVOR_ID=flavor_id)
+ mset.assert_any_call(params[0], params[1], params[2])
+ self.assertTrue(mread.called)
+ self.assertTrue(mwrite.called)
+
+ def test_configure_tempest_update_params_missing_image_id(self):
+ CONST.tempest_use_custom_images = True
+ self._test_missing_param(('compute', 'image_ref',
+ 'test_image_id'), 'test_image_id',
+ None)
+
+ def test_configure_tempest_update_params_missing_image_id_alt(self):
+ CONST.tempest_use_custom_images = True
+ conf_utils.IMAGE_ID_ALT = 'test_image_id_alt'
+ self._test_missing_param(('compute', 'image_ref_alt',
+ 'test_image_id_alt'), None, None)
+
+ def test_configure_tempest_update_params_missing_flavor_id(self):
+ CONST.tempest_use_custom_flavors = True
+ self._test_missing_param(('compute', 'flavor_ref',
+ 'test_flavor_id'), None,
+ 'test_flavor_id')
+
+ def test_configure_tempest_update_params_missing_flavor_id_alt(self):
+ CONST.tempest_use_custom_flavors = True
+ conf_utils.FLAVOR_ID_ALT = 'test_flavor_id_alt'
+ self._test_missing_param(('compute', 'flavor_ref_alt',
+ 'test_flavor_id_alt'), None,
+ None)
+
+ def test_configure_verifier_missing_temp_conf_file(self):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os.path.isfile',
+ return_value=False), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ft_utils.execute_command') as mexe, \
+ self.assertRaises(Exception) as context:
+ conf_utils.configure_verifier('test_dep_dir')
+ mexe.assert_any_call("rally verify configure-verifier")
+ msg = ("Tempest configuration file 'test_dep_dir/tempest.conf'"
+ " NOT found.")
+ self.assertTrue(msg in context)
+
+ def test_configure_verifier_default(self):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os.path.isfile',
+ return_value=True), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ft_utils.execute_command') as mexe:
+ self.assertEqual(conf_utils.configure_verifier('test_dep_dir'),
+ 'test_dep_dir/tempest.conf')
+ mexe.assert_any_call("rally verify configure-verifier "
+ "--reconfigure")
+
+ def test_configure_tempest_multisite_params_without_fuel(self):
+ conf_utils.CI_INSTALLER_TYPE = 'not_fuel'
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.os_utils.get_endpoint',
+ return_value='kingbird_endpoint_url'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'set') as mset, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'read') as mread, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'add_section') as msection, \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ConfigParser.RawConfigParser.'
+ 'write') as mwrite, \
+ mock.patch('__builtin__.open', mock.mock_open()), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.backup_tempest_config'):
+
+ conf_utils.configure_tempest_multisite_params('test_conf_file')
+ msection.assert_any_call("kingbird")
+ mset.assert_any_call('service_available', 'kingbird', 'true')
+ mset.assert_any_call('kingbird', 'endpoint_type', 'publicURL')
+ mset.assert_any_call('kingbird', 'TIME_TO_SYNC', '120')
+ mset.assert_any_call('kingbird', 'endpoint_url',
+ 'kingbird_endpoint_url')
+ self.assertTrue(mread.called)
+ self.assertTrue(mwrite.called)
+
+ def test_install_verifier_ext_default(self):
+ with mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.get_repo_tag',
+ return_value='test_tag'), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.'
+ 'conf_utils.ft_utils.'
+ 'execute_command_raise') as mexe:
+ conf_utils.install_verifier_ext('test_path')
+ cmd = ("rally verify add-verifier-ext --source test_path "
+ "--version test_tag")
+ error_msg = ("Problem while adding verifier extension from"
+ " test_path")
+ mexe.assert_called_once_with(cmd, error_msg=error_msg)
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/opnfv_tests/openstack/tempest/test_tempest.py b/functest/tests/unit/opnfv_tests/openstack/tempest/test_tempest.py
index 398c53bf3..34031b404 100644
--- a/functest/tests/unit/opnfv_tests/openstack/tempest/test_tempest.py
+++ b/functest/tests/unit/opnfv_tests/openstack/tempest/test_tempest.py
@@ -13,6 +13,7 @@ import mock
from functest.core import testcase
from functest.opnfv_tests.openstack.tempest import tempest
from functest.opnfv_tests.openstack.tempest import conf_utils
+from functest.utils.constants import CONST
class OSTempestTesting(unittest.TestCase):
@@ -33,6 +34,14 @@ class OSTempestTesting(unittest.TestCase):
'conf_utils.get_verifier_deployment_dir',
return_value='test_verifier_deploy_dir'):
self.tempestcommon = tempest.TempestCommon()
+ self.tempestsmoke_serial = tempest.TempestSmokeSerial()
+ self.tempestsmoke_parallel = tempest.TempestSmokeParallel()
+ self.tempestfull_parallel = tempest.TempestFullParallel()
+ with mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
+ 'conf_utils.install_verifier_ext'):
+ self.tempestmultisite = tempest.TempestMultisite()
+ self.tempestcustom = tempest.TempestCustom()
+ self.tempestdefcore = tempest.TempestDefcore()
@mock.patch('functest.opnfv_tests.openstack.tempest.tempest.logger.debug')
def test_generate_test_list_defcore_mode(self, mock_logger_debug):
@@ -100,6 +109,48 @@ class OSTempestTesting(unittest.TestCase):
with self.assertRaises(Exception):
self.tempestcommon.parse_verifier_result()
+ def test_apply_tempest_blacklist_no_blacklist(self):
+ with mock.patch('__builtin__.open', mock.mock_open()) as m, \
+ mock.patch.object(self.tempestcommon, 'read_file',
+ return_value=['test1', 'test2']):
+ conf_utils.TEMPEST_BLACKLIST = Exception
+ CONST.INSTALLER_TYPE = 'installer_type'
+ CONST.DEPLOY_SCENARIO = 'deploy_scenario'
+ self.tempestcommon.apply_tempest_blacklist()
+ obj = m()
+ obj.write.assert_any_call('test1\n')
+ obj.write.assert_any_call('test2\n')
+
+ def test_apply_tempest_blacklist_default(self):
+ item_dict = {'scenarios': ['deploy_scenario'],
+ 'installers': ['installer_type'],
+ 'tests': ['test2']}
+ with mock.patch('__builtin__.open', mock.mock_open()) as m, \
+ mock.patch.object(self.tempestcommon, 'read_file',
+ return_value=['test1', 'test2']), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
+ 'yaml.safe_load', return_value=item_dict):
+ CONST.INSTALLER_TYPE = 'installer_type'
+ CONST.DEPLOY_SCENARIO = 'deploy_scenario'
+ self.tempestcommon.apply_tempest_blacklist()
+ obj = m()
+ obj.write.assert_any_call('test1\n')
+ self.assertFalse(obj.write.assert_any_call('test2\n'))
+
+ @mock.patch('functest.opnfv_tests.openstack.tempest.tempest.logger.info')
+ def test_run_verifier_tests_default(self, mock_logger_info):
+ with mock.patch('__builtin__.open', mock.mock_open()), \
+ mock.patch('__builtin__.iter', return_value=['\} tempest\.']), \
+ mock.patch('functest.opnfv_tests.openstack.tempest.tempest.'
+ 'subprocess.Popen'):
+ conf_utils.TEMPEST_LIST = 'test_tempest_list'
+ cmd_line = ("rally verify start --load-list "
+ "test_tempest_list --detailed")
+ self.tempestcommon.run_verifier_tests()
+ mock_logger_info. \
+ assert_any_call("Starting Tempest test suite: '%s'."
+ % cmd_line)
+
@mock.patch('functest.opnfv_tests.openstack.tempest.tempest.logger.info')
def test_parse_verifier_result_default(self, mock_logger_info):
self.tempestcommon.VERIFICATION_ID = 'test_uuid'
diff --git a/functest/tests/unit/opnfv_tests/vnf/ims/test_clearwater.py b/functest/tests/unit/opnfv_tests/vnf/ims/test_clearwater.py
index 527f12e59..18bebfdff 100644
--- a/functest/tests/unit/opnfv_tests/vnf/ims/test_clearwater.py
+++ b/functest/tests/unit/opnfv_tests/vnf/ims/test_clearwater.py
@@ -39,16 +39,48 @@ class ClearwaterTesting(unittest.TestCase):
with mock.patch.object(self.clearwater.orchestrator,
'download_upload_and_deploy_blueprint',
return_value=''):
- self.clearwater.deploy_vnf(self.bp),
+ self.clearwater.deploy_vnf(self.bp)
self.assertEqual(self.clearwater.deploy, True)
def test_undeploy_vnf_deployment_passed(self):
with mock.patch.object(self.clearwater.orchestrator,
'undeploy_deployment'):
self.clearwater.deploy = True
+ self.clearwater.undeploy_vnf()
+ self.assertEqual(self.clearwater.deploy, False)
+
+ def test_undeploy_vnf_deployment_with_undeploy(self):
+ with mock.patch.object(self.clearwater.orchestrator,
+ 'undeploy_deployment') as m:
+ self.clearwater.deploy = False
+ self.clearwater.undeploy_vnf(),
+ self.assertEqual(self.clearwater.deploy, False)
+ self.assertFalse(m.called)
+
+ self.clearwater.orchestrator = None
+ self.clearwater.deploy = True
+ self.clearwater.undeploy_vnf(),
+ self.assertEqual(self.clearwater.deploy, True)
+
+ self.clearwater.deploy = False
self.clearwater.undeploy_vnf(),
self.assertEqual(self.clearwater.deploy, False)
+ def test_set_methods(self):
+ self.clearwater.set_orchestrator(self.orchestrator)
+ self.assertTrue(self.clearwater.orchestrator, self.orchestrator)
+ self.clearwater.set_flavor_id('test_flavor_id')
+ self.assertTrue(self.clearwater.config['flavor_id'], 'test_flavor_id')
+ self.clearwater.set_image_id('test_image_id')
+ self.assertTrue(self.clearwater.config['image_id'], 'test_image_id')
+ self.clearwater.set_agent_user('test_user')
+ self.assertTrue(self.clearwater.config['agent_user'], 'test_user')
+ self.clearwater.set_external_network_name('test_network')
+ self.assertTrue(self.clearwater.config['external_network_name'],
+ 'test_network')
+ self.clearwater.set_public_domain('test_domain')
+ self.assertTrue(self.clearwater.config['public_domain'],
+ 'test_domain')
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/opnfv_tests/vnf/ims/test_cloudify_ims.py b/functest/tests/unit/opnfv_tests/vnf/ims/test_cloudify_ims.py
index e25816f01..f47ea865e 100644
--- a/functest/tests/unit/opnfv_tests/vnf/ims/test_cloudify_ims.py
+++ b/functest/tests/unit/opnfv_tests/vnf/ims/test_cloudify_ims.py
@@ -13,7 +13,7 @@ import mock
from functest.opnfv_tests.vnf.ims import cloudify_ims
-class ImsVnfTesting(unittest.TestCase):
+class CloudifyImsTesting(unittest.TestCase):
logging.disable(logging.CRITICAL)
@@ -22,7 +22,7 @@ class ImsVnfTesting(unittest.TestCase):
'os.makedirs'), \
mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
'get_config', return_value='config_value'):
- self.ims_vnf = cloudify_ims.ImsVnf()
+ self.ims_vnf = cloudify_ims.CloudifyIms()
self.neutron_client = mock.Mock()
self.glance_client = mock.Mock()
self.keystone_client = mock.Mock()
@@ -419,94 +419,46 @@ class ImsVnfTesting(unittest.TestCase):
self.ims_vnf.test_vnf()
self.assertTrue(msg in context.exception)
- def test_test_vnf_create_number_failure(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.popen') as m, \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.get'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.post',
- return_value=self.mock_post), \
- self.assertRaises(Exception) as context:
- mock_obj = mock.Mock()
- attrs = {'read.return_value': 'test_ip\n'}
- mock_obj.configure_mock(**attrs)
- m.return_value = mock_obj
-
- self.ims_vnf.test_vnf()
-
- msg = "Unable to create a number:"
- self.assertTrue(msg in context.exception)
-
- def _get_post_status(self, url, cookies='', data=''):
- ellis_url = "http://test_ellis_ip/session"
- if url == ellis_url:
- return self.mock_post_200
- return self.mock_post
-
def test_test_vnf_fail(self):
with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.popen') as m, \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.get') as mock_get, \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.post',
- side_effect=self._get_post_status), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.get_resolvconf_ns'), \
- mock.patch('__builtin__.open', mock.mock_open()), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'subprocess.call'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.remove'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'json.load', return_value=''):
- mock_obj = mock.Mock()
- attrs = {'read.return_value': 'test_ip\n'}
- mock_obj.configure_mock(**attrs)
- m.return_value = mock_obj
+ 'os.popen'), \
+ mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
+ 'requests.get') as mock_get, \
+ mock.patch.object(self.ims_vnf, 'config_ellis'), \
+ mock.patch.object(self.ims_vnf,
+ 'run_clearwater_live_test') as clearwater_obj:
+ clearwater_obj.return_value = ''
- mock_obj2 = mock.Mock()
- attrs = {'json.return_value': {'outputs':
- {'dns_ip': 'test_dns_ip',
- 'ellis_ip': 'test_ellis_ip'}}}
- mock_obj2.configure_mock(**attrs)
- mock_get.return_value = mock_obj2
+ mock_obj2 = mock.Mock()
+ attrs = {'json.return_value': {'outputs':
+ {'dns_ip': 'test_dns_ip',
+ 'ellis_ip': 'test_ellis_ip'}}}
+ mock_obj2.configure_mock(**attrs)
+ mock_get.return_value = mock_obj2
- self.assertEqual(self.ims_vnf.test_vnf(),
- {'status': 'FAIL', 'result': ''})
+ self.assertEqual(self.ims_vnf.test_vnf(),
+ {'status': 'FAIL', 'result': ''})
def test_test_vnf_pass(self):
with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.popen') as m, \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.get') as mock_get, \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'requests.post',
- side_effect=self._get_post_status), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.get_resolvconf_ns'), \
- mock.patch('__builtin__.open', mock.mock_open()), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'subprocess.call'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.remove'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'json.load', return_value='vims_test_result'):
- mock_obj = mock.Mock()
- attrs = {'read.return_value': 'test_ip\n'}
- mock_obj.configure_mock(**attrs)
- m.return_value = mock_obj
-
- mock_obj2 = mock.Mock()
- attrs = {'json.return_value': {'outputs':
- {'dns_ip': 'test_dns_ip',
- 'ellis_ip': 'test_ellis_ip'}}}
- mock_obj2.configure_mock(**attrs)
- mock_get.return_value = mock_obj2
-
- self.assertEqual(self.ims_vnf.test_vnf(),
- {'status': 'PASS', 'result': 'vims_test_result'})
+ 'os.popen'), \
+ mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
+ 'requests.get') as mock_get, \
+ mock.patch.object(self.ims_vnf, 'config_ellis'), \
+ mock.patch.object(self.ims_vnf,
+ 'run_clearwater_live_test') as clearwater_obj:
+ clearwater_obj.return_value = 'vims_test_result'
+
+ mock_obj2 = mock.Mock()
+ attrs = {'json.return_value': {'outputs':
+ {'dns_ip': 'test_dns_ip',
+ 'ellis_ip': 'test_ellis_ip'}}}
+ mock_obj2.configure_mock(**attrs)
+ mock_get.return_value = mock_obj2
+
+ self.assertEqual(self.ims_vnf.test_vnf(),
+ {'status': 'PASS',
+ 'result': 'vims_test_result'})
def test_download_and_add_image_on_glance_incorrect_url(self):
with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
diff --git a/functest/tests/unit/opnfv_tests/vnf/ims/test_ims_base.by b/functest/tests/unit/opnfv_tests/vnf/ims/test_ims_base.by
new file mode 100644
index 000000000..9440bcdf3
--- /dev/null
+++ b/functest/tests/unit/opnfv_tests/vnf/ims/test_ims_base.by
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import logging
+import unittest
+
+import mock
+
+from functest.opnfv_tests.vnf.ims import ims_base
+
+
+class ClearwaterOnBoardingBaseTesting(unittest.TestCase):
+
+ logging.disable(logging.CRITICAL)
+
+ def setUp(self):
+ with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
+ 'os.makedirs'):
+ self.ims_vnf = ims_base.ClearwaterOnBoardingBase()
+
+ self.mock_post = mock.Mock()
+ attrs = {'status_code': 201,
+ 'cookies': ""}
+ self.mock_post.configure_mock(**attrs)
+
+ self.mock_post_200 = mock.Mock()
+ attrs = {'status_code': 200,
+ 'cookies': ""}
+ self.mock_post_200.configure_mock(**attrs)
+
+ self.mock_post_500 = mock.Mock()
+ attrs = {'status_code': 500,
+ 'cookies': ""}
+ self.mock_post_200.configure_mock(**attrs)
+
+ def test_create_ellis_number_failure(self):
+ with mock.patch('functest.opnfv_tests.vnf.ims.ims_base.'
+ 'requests.post',
+ return_value=self.mock_post_500), \
+ self.assertRaises(Exception) as context:
+ self.ims_vnf.create_ellis_number()
+
+ msg = "Unable to create a number:"
+ self.assertTrue(msg in context.exception)
+
+ def _get_post_status(self, url, cookies='', data=''):
+ ellis_url = "http://test_ellis_ip/session"
+ if url == ellis_url:
+ return self.mock_post_200
+ return self.mock_post
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/opnfv_tests/vnf/ims/test_orchestrator_cloudify.py b/functest/tests/unit/opnfv_tests/vnf/ims/test_orchestrator_cloudify.py
index 620b0216f..bf6d483f7 100644
--- a/functest/tests/unit/opnfv_tests/vnf/ims/test_orchestrator_cloudify.py
+++ b/functest/tests/unit/opnfv_tests/vnf/ims/test_orchestrator_cloudify.py
@@ -6,6 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
import logging
+import subprocess32 as subprocess
import unittest
import mock
@@ -117,6 +118,60 @@ class ImsVnfTesting(unittest.TestCase):
'dest'),
True)
+ def test_execute_command_failed(self):
+ with mock.patch('__builtin__.open',
+ mock.mock_open(read_data='test_data\n')):
+ subprocess.call = mock.create_autospec(subprocess.call,
+ return_value=0)
+ mock_log = mock.Mock()
+ cmd = 'test_cmd -e test_env bash_script'
+ ret = orchestrator_cloudify.execute_command(cmd, mock_log,
+ timeout=100)
+ self.assertEqual(ret, False)
+
+ def test_execute_command_default(self):
+ with mock.patch('__builtin__.open',
+ mock.mock_open(read_data='test_data\n')):
+ subprocess.call = mock. \
+ create_autospec(subprocess.call,
+ return_value=subprocess.TimeoutExpired)
+ mock_log = mock.Mock()
+ cmd = 'test_cmd -e test_env bash_script'
+ ret = orchestrator_cloudify.execute_command(cmd, mock_log,
+ timeout=100)
+ self.assertEqual(ret, ['test_data\n'])
+
+ def test_set_methods(self):
+ self.orchestrator.set_credentials('test_username', 'test_password',
+ 'test_tenant_name', 'test_auth_url')
+ self.assertTrue(self.orchestrator.config['keystone_username'],
+ 'test_username')
+ self.assertTrue(self.orchestrator.config['keystone_password'],
+ 'test_password')
+ self.assertTrue(self.orchestrator.config['keystone_url'],
+ 'test_auth_url')
+ self.assertTrue(self.orchestrator.config['keystone_tenant_name'],
+ 'test_tenant_name')
+ self.orchestrator.set_flavor_id('test_flavor_id')
+ self.assertTrue(self.orchestrator.config['flavor_id'],
+ 'test_flavor_id')
+ self.orchestrator.set_image_id('test_image_id')
+ self.assertTrue(self.orchestrator.config['image_id'], 'test_image_id')
+ self.orchestrator.set_external_network_name('test_network')
+ self.assertTrue(self.orchestrator.config['external_network_name'],
+ 'test_network')
+ self.orchestrator.set_ssh_user('test_user')
+ self.assertTrue(self.orchestrator.config['ssh_user'],
+ 'test_user')
+ self.orchestrator.set_nova_url('test_nova_url')
+ self.assertTrue(self.orchestrator.config['nova_url'],
+ 'test_nova_url')
+ self.orchestrator.set_neutron_url('test_neutron_url')
+ self.assertTrue(self.orchestrator.config['neutron_url'],
+ 'test_neutron_url')
+ self.orchestrator.set_nameservers(['test_subnet'])
+ self.assertTrue(self.orchestrator.config['dns_subnet_1'],
+ 'test_subnet')
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_functest_logger.py b/functest/tests/unit/utils/test_functest_logger.py
new file mode 100644
index 000000000..42e41a141
--- /dev/null
+++ b/functest/tests/unit/utils/test_functest_logger.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import logging
+import unittest
+
+import mock
+
+from functest.utils import functest_logger
+from functest.utils.constants import CONST
+
+
+class OSUtilsLogger(unittest.TestCase):
+
+ logging.disable(logging.CRITICAL)
+
+ def setUp(self):
+ with mock.patch('__builtin__.open', mock.mock_open()):
+ with mock.patch('functest.utils.functest_logger.os.path.exists',
+ return_value=True), \
+ mock.patch('functest.utils.functest_logger.'
+ 'json.load'), \
+ mock.patch('functest.utils.functest_logger.'
+ 'logging.config.dictConfig') as m:
+ self.logger = functest_logger.Logger('os_utils')
+ self.assertTrue(m.called)
+ with mock.patch('functest.utils.functest_logger.os.path.exists',
+ return_value=False), \
+ mock.patch('functest.utils.functest_logger.'
+ 'logging.basicConfig') as m:
+ self.logger = functest_logger.Logger('os_utils')
+ self.assertTrue(m.called)
+
+ def test_is_debug_false(self):
+ CONST.CI_DEBUG = False
+ self.assertFalse(self.logger.is_debug())
+
+ def test_is_debug_true(self):
+ CONST.CI_DEBUG = "True"
+ self.assertTrue(self.logger.is_debug())
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_openstack_tacker.py b/functest/tests/unit/utils/test_openstack_tacker.py
index dc7172580..37d77a18f 100644
--- a/functest/tests/unit/utils/test_openstack_tacker.py
+++ b/functest/tests/unit/utils/test_openstack_tacker.py
@@ -6,9 +6,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
import logging
-import mock
import unittest
+import mock
+from tackerclient.v1_0 import client as tackerclient
+
from functest.utils import openstack_tacker
from functest.tests.unit import test_utils
@@ -56,6 +58,13 @@ class OSTackerTesting(unittest.TestCase):
}
return cred_dict
+ def test_get_tacker_client(self):
+ with mock.patch('functest.utils.openstack_tacker.'
+ 'os_utils.get_session'):
+ tackerclient.Client = mock.Mock
+ ret = openstack_tacker.get_tacker_client()
+ self.assertTrue(isinstance(ret, mock.Mock))
+
def test_get_id_from_name(self):
with mock.patch.object(self.tacker_client, 'get',
return_value=self.getresponse):
@@ -183,8 +192,16 @@ class OSTackerTesting(unittest.TestCase):
vnfd_name=self.vnfd)
self.assertEqual(resp, self.vnfd)
- # TODO: Exception('You need to provide an VNFD'
- # 'id or name') AssertionError
+ def test_delete_vnfd_missing_vnfd_name(self):
+ with mock.patch('functest.utils.openstack_tacker.get_vnfd_id',
+ return_value=self.vnfd), \
+ self.assertRaises(Exception) as context:
+ resp = openstack_tacker.delete_vnfd(self.tacker_client,
+ vnfd_id=None,
+ vnfd_name=None)
+ self.assertIsNone(resp)
+ msg = 'You need to provide VNFD id or VNFD name'
+ self.assertTrue(msg in context)
@mock.patch('functest.utils.openstack_tacker.logger.error')
def test_delete_vnfd_exception(self, mock_logger_error):
@@ -253,7 +270,47 @@ class OSTackerTesting(unittest.TestCase):
"client"))
self.assertIsNone(resp)
- # TODO: wait_for_vnf
+ def test_wait_for_vnf_vnf_retrieval_failed(self):
+ with mock.patch('functest.utils.openstack_tacker.get_vnf',
+ return_value=None), \
+ self.assertRaises(Exception) as context:
+ openstack_tacker.wait_for_vnf(self.tacker_client,
+ vnf_id='vnf_id',
+ vnf_name='vnf_name')
+ msg = ("Could not retrieve VNF - id='vnf_id', "
+ "name='vnf_name'")
+ self.assertTrue(msg in context)
+ with mock.patch('functest.utils.openstack_tacker.get_vnf',
+ side_effect=Exception):
+ ret = openstack_tacker.wait_for_vnf(self.tacker_client,
+ vnf_id='vnf_id',
+ vnf_name='vnf_name')
+ self.assertEqual(ret, None)
+
+ def test_wait_for_vnf_vnf_status_error(self):
+ vnf = {'id': 'vnf_id',
+ 'status': 'ERROR'}
+ with mock.patch('functest.utils.openstack_tacker.get_vnf',
+ return_value=vnf), \
+ self.assertRaises(Exception) as context:
+ openstack_tacker.wait_for_vnf(self.tacker_client,
+ vnf_id='vnf_id',
+ vnf_name='vnf_name')
+ msg = ('Error when booting vnf vnf_id')
+ self.assertTrue(msg in context)
+
+ def test_wait_for_vnf_vnf_timeout(self):
+ vnf = {'id': 'vnf_id',
+ 'status': 'PENDING_CREATE'}
+ with mock.patch('functest.utils.openstack_tacker.get_vnf',
+ return_value=vnf), \
+ self.assertRaises(Exception) as context:
+ openstack_tacker.wait_for_vnf(self.tacker_client,
+ vnf_id='vnf_id',
+ vnf_name='vnf_name',
+ timeout=2)
+ msg = ('Timeout when booting vnf vnf_id')
+ self.assertTrue(msg in context)
def test_delete_vnf(self):
with mock.patch('functest.utils.openstack_tacker.get_vnf_id',
@@ -265,8 +322,13 @@ class OSTackerTesting(unittest.TestCase):
vnf_name=self.vnf)
self.assertEqual(resp, self.vnf)
- # TODO: Exception('You need to provide an VNF'
- # 'classifier id or name') AssertionError
+ def test_delete_vnf_missing_vnf_name(self):
+ with self.assertRaises(Exception) as context:
+ openstack_tacker.delete_vnf(self.tacker_client,
+ vnf_id=None,
+ vnf_name=None)
+ msg = 'You need to provide a VNF id or name'
+ self.assertTrue(msg in context)
@mock.patch('functest.utils.openstack_tacker.logger.error')
def test_delete_vnf_exception(self, mock_logger_error):
@@ -345,8 +407,13 @@ class OSTackerTesting(unittest.TestCase):
sfc_name=self.sfc)
self.assertEqual(resp, self.sfc)
- # TODO: Exception('You need to provide an SFC'
- # 'id or name') AssertionError
+ def test_delete_sfc_missing_sfc_name(self):
+ with self.assertRaises(Exception) as context:
+ openstack_tacker.delete_sfc(self.tacker_client,
+ sfc_id=None,
+ sfc_name=None)
+ msg = 'You need to provide an SFC id or name'
+ self.assertTrue(msg in context)
@mock.patch('functest.utils.openstack_tacker.logger.error')
def test_delete_sfc_exception(self, mock_logger_error):
@@ -431,8 +498,13 @@ class OSTackerTesting(unittest.TestCase):
sfc_clf_name=cl)
self.assertEqual(resp, cl)
- # TODO: Exception('You need to provide an SFC'
- # 'classifier id or name') AssertionError
+ def test_delete_sfc_classifier_missing_sfc_name(self):
+ with self.assertRaises(Exception) as context:
+ openstack_tacker.delete_vnf(self.tacker_client,
+ sfc_clf_id=None,
+ sfc_clf_name=None)
+ msg = 'You need to provide an SFCclassifier id or name'
+ self.assertTrue(msg in context)
@mock.patch('functest.utils.openstack_tacker.logger.error')
def test_delete_sfc_classifier_exception(self, mock_logger_error):
diff --git a/functest/tests/unit/utils/test_openstack_utils.py b/functest/tests/unit/utils/test_openstack_utils.py
index 673ad5e20..7f3995d03 100644
--- a/functest/tests/unit/utils/test_openstack_utils.py
+++ b/functest/tests/unit/utils/test_openstack_utils.py
@@ -229,6 +229,12 @@ class OSUtilsTesting(unittest.TestCase):
self.sec_group = {'id': 'sec_group_id',
'name': 'test_sec_group'}
+ self.sec_group_rule = {'id': 'sec_group_rule_id',
+ 'direction': 'direction',
+ 'protocol': 'protocol',
+ 'port_range_max': 'port_max',
+ 'security_group_id': self.sec_group['id'],
+ 'port_range_min': 'port_min'}
self.neutron_floatingip = {'id': 'fip_id',
'floating_ip_address': 'test_ip'}
self.neutron_client = mock.Mock()
@@ -260,6 +266,9 @@ class OSUtilsTesting(unittest.TestCase):
'show_bgpvpn.return_value': self.mock_return,
'list_security_groups.return_value': {'security_groups':
[self.sec_group]},
+ 'list_security_group_rules.'
+ 'return_value': {'security_group_rules':
+ [self.sec_group_rule]},
'create_security_group_rule.return_value': mock.Mock(),
'create_security_group.return_value': {'security_group':
self.sec_group},
@@ -1247,6 +1256,52 @@ class OSUtilsTesting(unittest.TestCase):
'test_sec_group'),
'sec_group_id')
+ def test_get_security_group_rules_default(self):
+ self.assertEqual(openstack_utils.
+ get_security_group_rules(self.neutron_client,
+ self.sec_group['id']),
+ [self.sec_group_rule])
+
+ @mock.patch('functest.utils.openstack_utils.logger.error')
+ def test_get_security_group_rules_exception(self, mock_logger_error):
+ self.assertEqual(openstack_utils.
+ get_security_group_rules(Exception,
+ 'sec_group_id'),
+ None)
+ self.assertTrue(mock_logger_error.called)
+
+ def test_check_security_group_rules_not_exists(self):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(self.neutron_client,
+ 'sec_group_id_2',
+ 'direction',
+ 'protocol',
+ 'port_min',
+ 'port_max'),
+ True)
+
+ def test_check_security_group_rules_exists(self):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(self.neutron_client,
+ self.sec_group['id'],
+ 'direction',
+ 'protocol',
+ 'port_min',
+ 'port_max'),
+ False)
+
+ @mock.patch('functest.utils.openstack_utils.logger.error')
+ def test_check_security_group_rules_exception(self, mock_logger_error):
+ self.assertEqual(openstack_utils.
+ check_security_group_rules(Exception,
+ 'sec_group_id',
+ 'direction',
+ 'protocol',
+ 'port_max',
+ 'port_min'),
+ None)
+ self.assertTrue(mock_logger_error.called)
+
def test_create_security_group_default(self):
self.assertEqual(openstack_utils.
create_security_group(self.neutron_client,
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index ffc870f62..4663f7ba6 100644
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -1054,6 +1054,40 @@ def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
return False
+def get_security_group_rules(neutron_client, sg_id):
+ try:
+ security_rules = neutron_client.list_security_group_rules()[
+ 'security_group_rules']
+ security_rules = [rule for rule in security_rules
+ if rule["security_group_id"] == sg_id]
+ return security_rules
+ except Exception, e:
+ logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+ " %s" % e)
+ return None
+
+
+def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+ port_min=None, port_max=None):
+ try:
+ security_rules = get_security_group_rules(neutron_client, sg_id)
+ security_rules = [rule for rule in security_rules
+ if (rule["direction"].lower() == direction
+ and rule["protocol"].lower() == protocol
+ and rule["port_range_min"] == port_min
+ and rule["port_range_max"] == port_max)]
+ if len(security_rules) == 0:
+ return True
+ else:
+ return False
+ except Exception, e:
+ logger.error("Error [check_security_group_rules("
+ " neutron_client, sg_id, direction,"
+ " protocol, port_min=None, port_max=None)]: "
+ "%s" % e)
+ return None
+
+
def create_security_group_full(neutron_client,
sg_name, sg_description):
sg_id = get_security_group_id(neutron_client, sg_name)