summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/com/pres/framework/framework.md26
-rw-r--r--functest/ci/config_functest.yaml36
-rw-r--r--functest/ci/generate_report.py149
-rw-r--r--functest/ci/logging.ini7
-rwxr-xr-xfunctest/ci/run_tests.py60
-rw-r--r--functest/ci/testcases.yaml59
-rw-r--r--functest/ci/tier_builder.py11
-rw-r--r--functest/ci/tier_handler.py5
-rw-r--r--functest/core/feature.py1
-rw-r--r--functest/core/pytest_suite_runner.py25
-rw-r--r--functest/core/testcase.py31
-rw-r--r--functest/core/vnf.py (renamed from functest/core/vnf_base.py)4
-rw-r--r--functest/energy/__init__.py0
-rw-r--r--functest/energy/energy.py203
-rw-r--r--functest/opnfv_tests/openstack/snaps/health_check.py7
-rw-r--r--functest/opnfv_tests/openstack/snaps/smoke.py10
-rw-r--r--functest/opnfv_tests/openstack/snaps/snaps_test_runner.py7
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_base.py411
-rwxr-xr-xfunctest/opnfv_tests/openstack/vping/vping_ssh.py278
-rwxr-xr-xfunctest/opnfv_tests/openstack/vping/vping_userdata.py134
-rwxr-xr-xfunctest/opnfv_tests/sdn/odl/odl.py19
-rwxr-xr-xfunctest/opnfv_tests/vnf/aaa/aaa.py4
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater_ims_base.py4
-rwxr-xr-xfunctest/opnfv_tests/vnf/ims/orchestra_ims.py6
-rw-r--r--functest/tests/unit/ci/test_generate_report.py129
-rw-r--r--functest/tests/unit/ci/test_run_tests.py31
-rw-r--r--functest/tests/unit/ci/test_tier_builder.py8
-rw-r--r--functest/tests/unit/ci/test_tier_handler.py5
-rw-r--r--functest/tests/unit/core/test_pytest_suite_runner.py51
-rw-r--r--functest/tests/unit/core/test_testcase.py38
-rw-r--r--functest/tests/unit/core/test_vnf.py168
-rw-r--r--functest/tests/unit/core/test_vnf_base.py52
-rw-r--r--functest/tests/unit/energy/__init__.py0
-rw-r--r--functest/tests/unit/energy/test_functest_energy.py277
-rw-r--r--functest/tests/unit/odl/test_odl.py47
-rw-r--r--functest/tests/unit/utils/test_decorators.py135
-rw-r--r--functest/tests/unit/utils/test_functest_utils.py38
-rw-r--r--[-rwxr-xr-x]functest/utils/config.py14
-rw-r--r--[-rwxr-xr-x]functest/utils/constants.py19
-rw-r--r--functest/utils/decorators.py8
-rw-r--r--functest/utils/env.py9
-rw-r--r--functest/utils/functest_utils.py16
-rw-r--r--functest/utils/openstack_utils.py114
-rw-r--r--requirements.txt12
-rw-r--r--test-requirements.txt9
45 files changed, 1700 insertions, 977 deletions
diff --git a/docs/com/pres/framework/framework.md b/docs/com/pres/framework/framework.md
index b80ad3dd7..3c1aae1b8 100644
--- a/docs/com/pres/framework/framework.md
+++ b/docs/com/pres/framework/framework.md
@@ -2,11 +2,11 @@
created by [Cédric Ollivier](mailto:cedric.ollivier@orange.com)
-2017/04/24
+2017/05/06
Note:
-- Functest integrates lots of heteregeounous testcases:
+- Functest integrates lots of heterogeneous testcases:
- python vs bash
- internal vs external
- it aims to benefit from object programming
@@ -33,11 +33,11 @@ Note:
### Our target
- limit run_tests.py instructions by defining:
- - the basic testcase attritutes
+ - the basic testcase attributes
- all common operations
- the status codes expected
- avoid duplicating codes between testcases
-- ease the developpement of third-party testcases (aka features)
+- ease the development of third-party testcases (aka features)
@@ -51,6 +51,7 @@ base model for single test case
- project_name (default: 'functest')
- case_name
- criteria
+- result
- start_time
- stop_time
- details
@@ -61,7 +62,8 @@ base model for single test case
| Method | Purpose |
|-------------------|--------------------------------------------|
| run(**kwargs) | run the test case |
-| check_criteria() | interpret the results of the test case |
+| is_successful() | interpret the results of the test case |
+| get_duration() | return the duration of the test case |
| push_to_db() | push the results of the test case to the DB|
@@ -69,7 +71,7 @@ base model for single test case
- the subclasses must override the default implementation which is false on purpose
- the new implementation must set the following attributes to push the results to DB:
- - criteria
+ - result
- start_time
- stop_time
@@ -99,7 +101,7 @@ except KeyError:
if result == testcase.TestCase.EX_OK:
if GlobalVariables.REPORT_FLAG:
test_case.push_to_db()
- result = test_case.check_criteria()
+ result = test_case.is_successful()
```
@@ -121,7 +123,7 @@ class Test(testcase.TestCase):
def run(self, **kwargs):
self.start_time = time.time()
print "Hello World"
- self.criteria = 'PASS'
+ self.result = 100
self.stop_time = time.time()
return testcase.TestCase.EX_OK
```
@@ -132,7 +134,7 @@ class Test(testcase.TestCase):
```yaml
case_name: first
project_name: functest
-criteria: 'status == "PASS"'
+criteria: 100
blocking: true
clean_flag: false
description: ''
@@ -164,7 +166,7 @@ base model for single feature
- allows executing any Python method by calling execute()
- sets the following attributes required to push the results to DB:
- - criteria
+ - result
- start_time
- stop_time
- doesn't fulfill details when pushing the results to the DB.
@@ -200,7 +202,7 @@ class Test(feature.Feature):
```yaml
case_name: second
project_name: functest
-criteria: 'status == "PASS"'
+criteria: 100
blocking: true
clean_flag: false
description: ''
@@ -234,7 +236,7 @@ execute the cmd passed as arg.
```
case_name: third
project_name: functest
-criteria: 'status == "PASS"'
+criteria: 100
blocking: true
clean_flag: false
description: ''
diff --git a/functest/ci/config_functest.yaml b/functest/ci/config_functest.yaml
index fd663abca..6d44f3989 100644
--- a/functest/ci/config_functest.yaml
+++ b/functest/ci/config_functest.yaml
@@ -49,6 +49,8 @@ general:
image_name: Cirros-0.3.5
image_name_alt: Cirros-0.3.5-1
image_file_name: cirros-0.3.5-x86_64-disk.img
+ image_url: http://download.cirros-cloud.net/0.3.5/cirros-0.3.5-x86_64-disk.img
+ image_user: cirros
image_disk_format: qcow2
image_username: cirros
image_password: cubswin:)
@@ -71,14 +73,23 @@ general:
functest:
testcases_yaml: /home/opnfv/repos/functest/functest/ci/testcases.yaml
-healthcheck:
- disk_image: /home/opnfv/functest/data/cirros-0.3.5-x86_64-disk.img
- disk_format: qcow2
- wait_time: 60
-
snaps:
use_keystone: True
- use_floating_ips: False
+ use_floating_ips: True
+# images:
+# cirros:
+# disk_url: http://download.cirros-cloud.net/0.3.5/cirros-0.3.5-x86_64-disk.img
+ # ARM
+# disk_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-disk.img
+# kernel_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-kernel
+# ramdisk_url: http://download.cirros-cloud.net/daily/20161201/cirros-d161201-aarch64-initramfs
+# extra_properties:
+# os_command_line: root=/dev/vdb1 rw rootwait console=tty0 console=ttyS0 console=ttyAMA0
+# hw_video_model: vga
+# ubuntu:
+# disk_url: http://uec-images.ubuntu.com/releases/trusty/14.04/ubuntu-14.04-server-cloudimg-amd64-disk1.img
+# centos:
+# disk_url: http://cloud.centos.org/centos/7/images/CentOS-7-x86_64-GenericCloud.qcow2
vping:
ping_timeout: 200
@@ -92,6 +103,14 @@ vping:
router_name: vping-router
sg_name: vPing-sg
sg_desc: Security group for vPing test case
+ keypair_name: vPing-keypair
+ keypair_priv_file: /tmp/vPing-keypair
+ keypair_pub_file: /tmp/vPing-keypair.pub
+ vm_boot_timeout: 180
+ vm_delete_timeout: 100
+ vm_ssh_connect_timeout: 60
+ cleanup_objects: True
+ unique_names: True
onos_sfc:
image_base_url: http://artifacts.opnfv.org/sfc/demo
@@ -204,3 +223,8 @@ results:
# you can also set a file (e.g. /home/opnfv/functest/results/dump.txt) to dump results
# test_db_url: file:///home/opnfv/functest/results/dump.txt
test_db_url: http://testresults.opnfv.org/test/api/v1/results
+
+energy_recorder:
+ api_url: http://161.105.253.100:8888/resources
+ api_user: ""
+ api_password: ""
diff --git a/functest/ci/generate_report.py b/functest/ci/generate_report.py
deleted file mode 100644
index 17240e8c0..000000000
--- a/functest/ci/generate_report.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/usr/bin/env python
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-import json
-import logging
-import re
-import urllib2
-
-import functest.utils.functest_utils as ft_utils
-from functest.utils.constants import CONST
-
-COL_1_LEN = 25
-COL_2_LEN = 15
-COL_3_LEN = 12
-COL_4_LEN = 15
-COL_5_LEN = 75
-
-# If we run from CI (Jenkins) we will push the results to the DB
-# and then we can print the url to the specific test result
-
-
-logger = logging.getLogger(__name__)
-
-
-def init(tiers_to_run=[]):
- test_cases_arr = []
- for tier in tiers_to_run:
- for test in tier.get_tests():
- test_cases_arr.append({'test_name': test.get_name(),
- 'tier_name': tier.get_name(),
- 'result': 'Not executed',
- 'duration': '0',
- 'url': ''})
- return test_cases_arr
-
-
-def get_results_from_db():
- url = "%s/results?build_tag=%s" % (ft_utils.get_db_url(),
- CONST.BUILD_TAG)
- logger.debug("Query to rest api: %s" % url)
- try:
- data = json.load(urllib2.urlopen(url))
- return data['results']
- except:
- logger.error("Cannot read content from the url: %s" % url)
- return None
-
-
-def get_data(test, results):
- test_result = test['result']
- url = ''
- for test_db in results:
- if test['test_name'] in test_db['case_name']:
- id = test_db['_id']
- url = ft_utils.get_db_url() + '/results/' + id
- test_result = test_db['criteria']
-
- return {"url": url, "result": test_result}
-
-
-def print_line(w1, w2='', w3='', w4='', w5=''):
- str = ('| ' + w1.ljust(COL_1_LEN - 1) +
- '| ' + w2.ljust(COL_2_LEN - 1) +
- '| ' + w3.ljust(COL_3_LEN - 1) +
- '| ' + w4.ljust(COL_4_LEN - 1))
- if CONST.__getattribute__('IS_CI_RUN'):
- str += ('| ' + w5.ljust(COL_5_LEN - 1))
- str += '|\n'
- return str
-
-
-def print_line_no_columns(str):
- TOTAL_LEN = COL_1_LEN + COL_2_LEN + COL_3_LEN + COL_4_LEN + 2
- if CONST.__getattribute__('IS_CI_RUN'):
- TOTAL_LEN += COL_5_LEN + 1
- return ('| ' + str.ljust(TOTAL_LEN) + "|\n")
-
-
-def print_separator(char="=", delimiter="+"):
- str = ("+" + char * COL_1_LEN +
- delimiter + char * COL_2_LEN +
- delimiter + char * COL_3_LEN +
- delimiter + char * COL_4_LEN)
- if CONST.__getattribute__('IS_CI_RUN'):
- str += (delimiter + char * COL_5_LEN)
- str += '+\n'
- return str
-
-
-def main(args=[]):
- executed_test_cases = args
-
- if CONST.__getattribute__('IS_CI_RUN'):
- results = get_results_from_db()
- if results is not None:
- for test in executed_test_cases:
- data = get_data(test, results)
- test.update({"url": data['url'],
- "result": data['result']})
-
- TOTAL_LEN = COL_1_LEN + COL_2_LEN + COL_3_LEN + COL_4_LEN
- if CONST.__getattribute__('IS_CI_RUN'):
- TOTAL_LEN += COL_5_LEN
- MID = TOTAL_LEN / 2
-
- if CONST.__getattribute__('BUILD_TAG') is not None:
- if re.search("daily", CONST.__getattribute__('BUILD_TAG')) is not None:
- CONST.__setattr__('CI_LOOP', 'daily')
- else:
- CONST.__setattr__('CI_LOOP', 'weekly')
-
- str = ''
- str += print_separator('=', delimiter="=")
- str += print_line_no_columns(' ' * (MID - 8) + 'FUNCTEST REPORT')
- str += print_separator('=', delimiter="=")
- str += print_line_no_columns(' ')
- str += print_line_no_columns(" Deployment description:")
- str += print_line_no_columns(" INSTALLER: %s"
- % CONST.__getattribute__('INSTALLER_TYPE'))
- if CONST.__getattribute__('DEPLOY_SCENARIO') is not None:
- str += print_line_no_columns(" SCENARIO: %s"
- % CONST.__getattribute__(
- 'DEPLOY_SCENARIO'))
- if CONST.__getattribute__('BUILD_TAG') is not None:
- str += print_line_no_columns(" BUILD TAG: %s"
- % CONST.__getattribute__('BUILD_TAG'))
- if CONST.__getattribute__('CI_LOOP') is not None:
- str += print_line_no_columns(" CI LOOP: %s"
- % CONST.__getattribute__('CI_LOOP'))
- str += print_line_no_columns(' ')
- str += print_separator('=')
- if CONST.__getattribute__('IS_CI_RUN'):
- str += print_line('TEST CASE', 'TIER', 'DURATION', 'RESULT', 'URL')
- else:
- str += print_line('TEST CASE', 'TIER', 'DURATION', 'RESULT')
- str += print_separator('=')
- for test in executed_test_cases:
- str += print_line(test['test_name'],
- test['tier_name'],
- test['duration'],
- test['result'],
- test['url'])
- str += print_separator('-')
-
- logger.info("\n\n\n%s" % str)
diff --git a/functest/ci/logging.ini b/functest/ci/logging.ini
index 8036ed292..210c8f5f4 100644
--- a/functest/ci/logging.ini
+++ b/functest/ci/logging.ini
@@ -1,5 +1,5 @@
[loggers]
-keys=root,functest,ci,cli,core,opnfv_tests,utils
+keys=root,functest,ci,cli,core,energy,opnfv_tests,utils
[handlers]
keys=console,wconsole,file,null
@@ -31,6 +31,11 @@ level=NOTSET
handlers=console
qualname=functest.core
+[logger_energy]
+level=NOTSET
+handlers=wconsole
+qualname=functest.energy
+
[logger_opnfv_tests]
level=NOTSET
handlers=wconsole
diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py
index ae002c2e3..76760096b 100755
--- a/functest/ci/run_tests.py
+++ b/functest/ci/run_tests.py
@@ -17,7 +17,8 @@ import os
import re
import sys
-import functest.ci.generate_report as generate_report
+import prettytable
+
import functest.ci.tier_builder as tb
import functest.core.testcase as testcase
import functest.utils.functest_utils as ft_utils
@@ -39,6 +40,10 @@ class BlockingTestFailed(Exception):
pass
+class TestNotEnabled(Exception):
+ pass
+
+
class RunTestsParser(object):
def __init__(self):
@@ -99,13 +104,6 @@ def cleanup():
os_clean.main()
-def update_test_info(test_name, result, duration):
- for test in GlobalVariables.EXECUTED_TEST_CASES:
- if test['test_name'] == test_name:
- test.update({"result": result,
- "duration": duration})
-
-
def get_run_dict(testname):
try:
dict = ft_utils.get_dict_by_test(testname)
@@ -120,8 +118,9 @@ def get_run_dict(testname):
def run_test(test, tier_name, testcases=None):
- duration = "XX:XX"
- result_str = "PASS"
+ if not test.is_enabled():
+ raise TestNotEnabled("The test case {} is not enabled"
+ .format(test.get_name()))
test_name = test.get_name()
logger.info("\n") # blank line
print_separator("=")
@@ -145,6 +144,7 @@ def run_test(test, tier_name, testcases=None):
cls = getattr(module, run_dict['class'])
test_dict = ft_utils.get_dict_by_test(test_name)
test_case = cls(**test_dict)
+ GlobalVariables.EXECUTED_TEST_CASES.append(test_case)
try:
kwargs = run_dict['args']
result = test_case.run(**kwargs)
@@ -154,7 +154,7 @@ def run_test(test, tier_name, testcases=None):
if GlobalVariables.REPORT_FLAG:
test_case.push_to_db()
result = test_case.is_successful()
- duration = test_case.get_duration()
+ logger.info("Test result:\n\n%s\n", test_case)
except ImportError:
logger.exception("Cannot import module {}".format(
run_dict['module']))
@@ -166,24 +166,13 @@ def run_test(test, tier_name, testcases=None):
if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
cleanup()
-
- logger.info("Test execution time: %s", duration)
-
if result != testcase.TestCase.EX_OK:
logger.error("The test case '%s' failed. " % test_name)
GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
- result_str = "FAIL"
-
if test.is_blocking():
- if not testcases or testcases == "all":
- # if it is a single test we don't print the whole results table
- update_test_info(test_name, result_str, duration)
- generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
raise BlockingTestFailed("The test case {} failed and is blocking"
.format(test.get_name()))
- update_test_info(test_name, result_str, duration)
-
def run_tier(tier):
tier_name = tier.get_name()
@@ -215,12 +204,9 @@ def run_all(tiers):
tier.get_test_names()))
logger.info("Tests to be executed:%s" % summary)
- GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
for tier in tiers_to_run:
run_tier(tier)
- generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
-
def main(**kwargs):
@@ -239,12 +225,10 @@ def main(**kwargs):
if kwargs['test']:
source_rc_file()
if _tiers.get_tier(kwargs['test']):
- GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(
- [_tiers.get_tier(kwargs['test'])])
run_tier(_tiers.get_tier(kwargs['test']))
elif _tiers.get_test(kwargs['test']):
run_test(_tiers.get_test(kwargs['test']),
- _tiers.get_tier(kwargs['test']),
+ _tiers.get_tier_name(kwargs['test']),
kwargs['test'])
elif kwargs['test'] == "all":
run_all(_tiers)
@@ -262,6 +246,26 @@ def main(**kwargs):
except Exception as e:
logger.error(e)
GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
+
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['env var', 'value'])
+ for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG',
+ 'CI_LOOP']:
+ msg.add_row([env_var, CONST.__getattribute__(env_var)])
+ logger.info("Deployment description: \n\n%s\n", msg)
+
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['test case', 'project', 'tier', 'duration', 'result'])
+ for test_case in GlobalVariables.EXECUTED_TEST_CASES:
+ result = 'PASS' if(test_case.is_successful(
+ ) == test_case.EX_OK) else 'FAIL'
+ msg.add_row([test_case.case_name, test_case.project_name,
+ _tiers.get_tier_name(test_case.case_name),
+ test_case.get_duration(), result])
+ logger.info("FUNCTEST REPORT: \n\n%s\n", msg)
+
logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
return GlobalVariables.OVERALL_RESULT
diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml
index 7009e910c..d98a2de23 100644
--- a/functest/ci/testcases.yaml
+++ b/functest/ci/testcases.yaml
@@ -19,7 +19,6 @@ tiers:
simple queries. When the config value of
snaps.use_keystone is True, functest must have access to
the cloud's private network.
-
dependencies:
installer: '^((?!netvirt).)*$'
scenario: ''
@@ -39,13 +38,13 @@ tiers:
simple queries. When the config value of
snaps.use_keystone is True, functest must have access to
the cloud's private network.
-
dependencies:
installer: '^((?!netvirt).)*$'
scenario: ''
run:
module: 'functest.opnfv_tests.openstack.snaps.api_check'
class: 'ApiCheck'
+
-
case_name: snaps_health_check
project_name: functest
@@ -63,6 +62,7 @@ tiers:
run:
module: 'functest.opnfv_tests.openstack.snaps.health_check'
class: 'HealthCheck'
+
-
name: smoke
order: 1
@@ -266,6 +266,7 @@ tiers:
testcases:
-
case_name: promise
+ enabled: false
project_name: promise
criteria: 100
blocking: false
@@ -283,6 +284,7 @@ tiers:
-
case_name: doctor-notification
+ enabled: false
project_name: doctor
criteria: 100
blocking: false
@@ -300,6 +302,7 @@ tiers:
-
case_name: bgpvpn
+ enabled: false
project_name: sdnvpn
criteria: 100
blocking: false
@@ -317,6 +320,7 @@ tiers:
-
case_name: security_scan
+ enabled: false
project_name: securityscanning
criteria: 100
blocking: false
@@ -334,6 +338,7 @@ tiers:
-
case_name: copper
+ enabled: false
project_name: copper
criteria: 100
blocking: false
@@ -351,6 +356,7 @@ tiers:
-
case_name: multisite
+ enabled: false
project_name: multisite
criteria: 100
blocking: false
@@ -363,8 +369,10 @@ tiers:
run:
module: 'functest.opnfv_tests.openstack.tempest.tempest'
class: 'TempestMultisite'
+
-
case_name: functest-odl-sfc
+ enabled: false
project_name: sfc
criteria: 100
blocking: false
@@ -379,8 +387,10 @@ tiers:
class: 'BashFeature'
args:
cmd: 'cd /home/opnfv/repos/sfc/sfc/tests/functest && python ./run_tests.py'
+
-
case_name: onos_sfc
+ enabled: false
project_name: functest
criteria: 100
blocking: true
@@ -393,8 +403,10 @@ tiers:
run:
module: 'functest.opnfv_tests.sdn.onos.onos'
class: 'OnosSfc'
+
-
case_name: parser-basics
+ enabled: false
project_name: parser
criteria: 100
blocking: false
@@ -409,8 +421,10 @@ tiers:
class: 'BashFeature'
args:
cmd: 'cd /home/opnfv/repos/parser/tests && ./functest_run.sh'
+
-
case_name: domino-multinode
+ enabled: false
project_name: domino
criteria: 100
blocking: false
@@ -425,8 +439,10 @@ tiers:
class: 'BashFeature'
args:
cmd: 'cd /home/opnfv/repos/domino && ./tests/run_multinode.sh'
+
-
case_name: gluon_vping
+ enabled: false
project_name: netready
criteria: 100
blocking: false
@@ -441,8 +457,10 @@ tiers:
class: 'BashFeature'
args:
cmd: 'cd /home/opnfv/repos/netready/test/functest && python ./gluon-test-suite.py'
+
-
case_name: barometercollectd
+ enabled: false
project_name: barometer
criteria: 100
blocking: false
@@ -458,6 +476,7 @@ tiers:
run:
module: 'functest.opnfv_tests.features.barometer'
class: 'BarometerCollectd'
+
-
name: components
order: 3
@@ -481,6 +500,7 @@ tiers:
run:
module: 'functest.opnfv_tests.openstack.tempest.tempest'
class: 'TempestFullParallel'
+
-
case_name: tempest_custom
project_name: functest
@@ -499,6 +519,7 @@ tiers:
run:
module: 'functest.opnfv_tests.openstack.tempest.tempest'
class: 'TempestCustom'
+
-
case_name: rally_full
project_name: functest
@@ -537,22 +558,26 @@ tiers:
run:
module: 'functest.opnfv_tests.vnf.ims.cloudify_ims'
class: 'CloudifyIms'
-# -
-# case_name: aaa
-# project_name: functest
-# criteria: 100
-# blocking: false
-# clean_flag: true
-# description: >-
-# Test suite from Parser project.
-# dependencies:
-# installer: ''
-# scenario: ''
-# run:
-# module: 'functest.opnfv_tests.vnf.aaa.aaa'
-# class: 'AaaVnf'
+
+ -
+ case_name: aaa
+ enabled: false
+ project_name: functest
+ criteria: 100
+ blocking: false
+ clean_flag: true
+ description: >-
+ Test suite from Parser project.
+ dependencies:
+ installer: ''
+ scenario: ''
+ run:
+ module: 'functest.opnfv_tests.vnf.aaa.aaa'
+ class: 'AaaVnf'
+
-
case_name: orchestra_ims
+ enabled: false
project_name: functest
criteria: 100
blocking: false
@@ -568,6 +593,7 @@ tiers:
-
case_name: opera_vims
+ enabled: false
project_name: opera
criteria: 100
blocking: false
@@ -583,6 +609,7 @@ tiers:
-
case_name: vyos_vrouter
+ enabled: false
project_name: functest
criteria: 100
blocking: false
diff --git a/functest/ci/tier_builder.py b/functest/ci/tier_builder.py
index 44b272584..12562f09c 100644
--- a/functest/ci/tier_builder.py
+++ b/functest/ci/tier_builder.py
@@ -47,12 +47,15 @@ class TierBuilder(object):
dep = th.Dependency(installer, scenario)
testcase = th.TestCase(name=dic_testcase['case_name'],
+ enabled=dic_testcase.get(
+ 'enabled', True),
dependency=dep,
criteria=dic_testcase['criteria'],
blocking=dic_testcase['blocking'],
clean_flag=dic_testcase['clean_flag'],
description=dic_testcase['description'])
- if testcase.is_compatible(self.ci_installer, self.ci_scenario):
+ if (testcase.is_compatible(self.ci_installer, self.ci_scenario)
+ and testcase.is_enabled()):
tier.add_test(testcase)
self.tier_objects.append(tier)
@@ -72,6 +75,12 @@ class TierBuilder(object):
return self.tier_objects[i]
return None
+ def get_tier_name(self, test_name):
+ for i in range(0, len(self.tier_objects)):
+ if self.tier_objects[i].is_test(test_name):
+ return self.tier_objects[i].name
+ return None
+
def get_test(self, test_name):
for i in range(0, len(self.tier_objects)):
if self.tier_objects[i].is_test(test_name):
diff --git a/functest/ci/tier_handler.py b/functest/ci/tier_handler.py
index fe7372a38..36ce245e7 100644
--- a/functest/ci/tier_handler.py
+++ b/functest/ci/tier_handler.py
@@ -105,12 +105,14 @@ class Tier(object):
class TestCase(object):
def __init__(self, name,
+ enabled,
dependency,
criteria,
blocking,
clean_flag,
description=""):
self.name = name
+ self.enabled = enabled
self.dependency = dependency
self.criteria = criteria
self.blocking = blocking
@@ -138,6 +140,9 @@ class TestCase(object):
def get_name(self):
return self.name
+ def is_enabled(self):
+ return self.enabled
+
def get_criteria(self):
return self.criteria
diff --git a/functest/core/feature.py b/functest/core/feature.py
index 8563c9257..140c9bb2e 100644
--- a/functest/core/feature.py
+++ b/functest/core/feature.py
@@ -83,7 +83,6 @@ class Feature(base.TestCase):
ft_utils.logger_test_results(
self.project_name, self.case_name,
self.result, self.details)
- self.__logger.info("%s %s", self.project_name, self.result)
except Exception: # pylint: disable=broad-except
self.__logger.exception("%s FAILED", self.project_name)
self.__logger.info("Test result is stored in '%s'", self.result_file)
diff --git a/functest/core/pytest_suite_runner.py b/functest/core/pytest_suite_runner.py
index 8b5da05ef..bcbaa25d3 100644
--- a/functest/core/pytest_suite_runner.py
+++ b/functest/core/pytest_suite_runner.py
@@ -5,18 +5,25 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import testcase as base
+# pylint: disable=missing-docstring
+
+import logging
import unittest
import time
+from functest.core import testcase
+
+logging.basicConfig()
+
-class PyTestSuiteRunner(base.TestCase):
+class PyTestSuiteRunner(testcase.TestCase):
"""
This superclass is designed to execute pre-configured unittest.TestSuite()
objects
"""
def __init__(self, **kwargs):
super(PyTestSuiteRunner, self).__init__(**kwargs)
+ self.logger = logging.getLogger(self.__class__.__name__)
self.suite = None
def run(self, **kwargs):
@@ -44,14 +51,14 @@ class PyTestSuiteRunner(base.TestCase):
# we shall distinguish Execution Error from FAIL results
# TestCase.EX_RUN_ERROR means that the test case was not run
# not that it was run but the result was FAIL
- exit_code = base.TestCase.EX_OK
- if ((result.errors and len(result.errors) > 0)
- or (result.failures and len(result.failures) > 0)):
- self.logger.info("%s FAILED" % self.case_name)
- self.result = 'FAIL'
+ exit_code = testcase.TestCase.EX_OK
+ if ((result.errors and len(result.errors) > 0) or
+ (result.failures and len(result.failures) > 0)):
+ self.logger.info("%s FAILED", self.case_name)
+ self.result = 0
else:
- self.logger.info("%s OK" % self.case_name)
- self.result = 'PASS'
+ self.logger.info("%s OK", self.case_name)
+ self.result = 100
self.details = {}
return exit_code
diff --git a/functest/core/testcase.py b/functest/core/testcase.py
index 624655424..431615250 100644
--- a/functest/core/testcase.py
+++ b/functest/core/testcase.py
@@ -12,6 +12,8 @@
import logging
import os
+import prettytable
+
import functest.utils.functest_utils as ft_utils
__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
@@ -39,9 +41,26 @@ class TestCase(object):
self.project_name = kwargs.get('project_name', 'functest')
self.case_name = kwargs.get('case_name', '')
self.criteria = kwargs.get('criteria', 100)
- self.result = ""
- self.start_time = ""
- self.stop_time = ""
+ self.result = 0
+ self.start_time = 0
+ self.stop_time = 0
+
+ def __str__(self):
+ try:
+ assert self.project_name
+ assert self.case_name
+ result = 'PASS' if(self.is_successful(
+ ) == TestCase.EX_OK) else 'FAIL'
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['test case', 'project', 'duration',
+ 'result'])
+ msg.add_row([self.case_name, self.project_name,
+ self.get_duration(), result])
+ return msg.get_string()
+ except AssertionError:
+ self.__logger.error("We cannot print invalid objects")
+ return super(TestCase, self).__str__()
def get_duration(self):
"""Return the duration of the test case.
@@ -57,7 +76,7 @@ class TestCase(object):
return "XX:XX"
return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod(
self.stop_time - self.start_time, 60))
- except Exception:
+ except Exception: # pylint: disable=broad-except
self.__logger.error("Please run test before getting the duration")
return "XX:XX"
@@ -75,7 +94,9 @@ class TestCase(object):
"""
try:
assert self.criteria
- if isinstance(self.result, int) and isinstance(self.criteria, int):
+ assert self.result is not None
+ if (not isinstance(self.result, str) and
+ not isinstance(self.criteria, str)):
if self.result >= self.criteria:
return TestCase.EX_OK
else:
diff --git a/functest/core/vnf_base.py b/functest/core/vnf.py
index bc5bf9a71..5667b2997 100644
--- a/functest/core/vnf_base.py
+++ b/functest/core/vnf.py
@@ -17,12 +17,12 @@ import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
-class VnfOnBoardingBase(base.TestCase):
+class VnfOnBoarding(base.TestCase):
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
- super(VnfOnBoardingBase, self).__init__(**kwargs)
+ super(VnfOnBoarding, self).__init__(**kwargs)
self.repo = kwargs.get('repo', '')
self.cmd = kwargs.get('cmd', '')
self.details = {}
diff --git a/functest/energy/__init__.py b/functest/energy/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/functest/energy/__init__.py
diff --git a/functest/energy/energy.py b/functest/energy/energy.py
new file mode 100644
index 000000000..a20c79926
--- /dev/null
+++ b/functest/energy/energy.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8 -*-
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""This module manages calls to Energy recording API."""
+
+import json
+import logging
+import urllib
+import requests
+
+import functest.utils.functest_utils as ft_utils
+
+
+def enable_recording(method):
+ """
+ Decorator to record energy during "method" exection.
+
+ param method: Method to suround with start and stop
+ :type method: function
+
+ .. note:: "method" should belong to a class having a "case_name"
+ attribute
+ """
+ def wrapper(*args):
+ """Wrapper for decorator to handle method arguments."""
+ EnergyRecorder.start(args[0].case_name)
+ return_value = method(*args)
+ EnergyRecorder.stop()
+ return return_value
+ return wrapper
+
+
+# Class to manage energy recording sessions
+class EnergyRecorder(object):
+ """Manage Energy recording session."""
+
+ logger = logging.getLogger(__name__)
+ # Energy recording API connectivity settings
+ # see load_config method
+ energy_recorder_api = None
+
+ # Default initial step
+ INITIAL_STEP = "starting"
+
+ @staticmethod
+ def load_config():
+ """
+ Load connectivity settings from yaml.
+
+ Load connectivity settings to Energy recording API
+ Use functest global config yaml file
+ (see functest_utils.get_functest_config)
+ """
+ # Singleton pattern for energy_recorder_api static member
+ # Load only if not previouly done
+ if EnergyRecorder.energy_recorder_api is None:
+ environment = ft_utils.get_pod_name()
+
+ # API URL
+ energy_recorder_uri = ft_utils.get_functest_config(
+ "energy_recorder.api_url")
+ assert energy_recorder_uri
+ assert environment
+
+ energy_recorder_uri += "/recorders/environment/"
+ energy_recorder_uri += urllib.quote_plus(environment)
+ EnergyRecorder.logger.debug(
+ "API recorder at: " + energy_recorder_uri)
+
+ # Creds
+ user = ft_utils.get_functest_config(
+ "energy_recorder.api_user")
+ password = ft_utils.get_functest_config(
+ "energy_recorder.api_password")
+
+ if user != "" and password != "":
+ energy_recorder_api_auth = (user, password)
+ else:
+ energy_recorder_api_auth = None
+
+ # Final config
+ EnergyRecorder.energy_recorder_api = {
+ "uri": energy_recorder_uri,
+ "auth": energy_recorder_api_auth
+ }
+
+ @staticmethod
+ def start(scenario):
+ """
+ Start a recording session for scenario.
+
+ param scenario: Starting scenario
+ :type scenario: string
+ """
+ return_status = True
+ try:
+ EnergyRecorder.logger.debug("Starting recording")
+ # Ensure that connectyvity settings are loaded
+ EnergyRecorder.load_config()
+
+ # Create API payload
+ payload = {
+ "step": EnergyRecorder.INITIAL_STEP,
+ "scenario": scenario
+ }
+ # Call API to start energy recording
+ response = requests.post(
+ EnergyRecorder.energy_recorder_api["uri"],
+ data=json.dumps(payload),
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ }
+ )
+ if response.status_code != 200:
+ log_msg = "Error while starting energy recording session\n{}"
+ log_msg = log_msg.format(response.text)
+ EnergyRecorder.logger.info(log_msg)
+ return_status = False
+
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.exception(
+ "Error while starting energy recorder API"
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def stop():
+ """Stop current recording session."""
+ EnergyRecorder.logger.debug("Stopping recording")
+ return_status = True
+ try:
+ # Ensure that connectyvity settings are loaded
+ EnergyRecorder.load_config()
+
+ # Call API to stop energy recording
+ response = requests.delete(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ }
+ )
+ if response.status_code != 200:
+ log_msg = "Error while stating energy recording session\n{}"
+ log_msg = log_msg.format(response.text)
+ EnergyRecorder.logger.error(log_msg)
+ return_status = False
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.exception(
+ "Error while stoping energy recorder API"
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def set_step(step):
+ """Notify energy recording service of current step of the testcase."""
+ EnergyRecorder.logger.debug("Setting step")
+ return_status = True
+ try:
+ # Ensure that connectyvity settings are loaded
+ EnergyRecorder.load_config()
+
+ # Create API payload
+ payload = {
+ "step": step,
+ }
+
+ # Call API to define step
+ response = requests.post(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ data=json.dumps(payload),
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ }
+ )
+ if response.status_code != 200:
+ log_msg = "Error while setting current step of testcase\n{}"
+ log_msg = log_msg.format(response.text)
+ EnergyRecorder.logger.error(log_msg)
+ return_status = False
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.exception(
+ "Error while setting step on energy recorder API"
+ )
+ return_status = False
+ return return_status
diff --git a/functest/opnfv_tests/openstack/snaps/health_check.py b/functest/opnfv_tests/openstack/snaps/health_check.py
index c057eb2b0..0daddcdd2 100644
--- a/functest/opnfv_tests/openstack/snaps/health_check.py
+++ b/functest/opnfv_tests/openstack/snaps/health_check.py
@@ -23,14 +23,15 @@ class HealthCheck(SnapsTestRunner):
"""
def __init__(self, **kwargs):
if "case_name" not in kwargs:
- kwargs["case_name"] = "snaps_health_check"
+ kwargs["case_name"] = "snaps_images_cirros"
super(HealthCheck, self).__init__(**kwargs)
self.suite = unittest.TestSuite()
image_custom_config = None
- if hasattr(CONST, 'snaps_health_check'):
- image_custom_config = CONST.__getattribute__('snaps_health_check')
+
+ if hasattr(CONST, 'snaps_images_cirros'):
+ image_custom_config = CONST.__getattribute__('snaps_images_cirros')
self.suite.addTest(
OSIntegrationTestCase.parameterize(
SimpleHealthCheck, os_creds=self.os_creds,
diff --git a/functest/opnfv_tests/openstack/snaps/smoke.py b/functest/opnfv_tests/openstack/snaps/smoke.py
index 2c6fc2553..d9f95e90c 100644
--- a/functest/opnfv_tests/openstack/snaps/smoke.py
+++ b/functest/opnfv_tests/openstack/snaps/smoke.py
@@ -28,11 +28,9 @@ class SnapsSmoke(SnapsTestRunner):
self.suite = unittest.TestSuite()
- # The snaps smoke test uses the same config as the
- # snaps_health_check suite, so reuse it here
- image_custom_config = None
- if hasattr(CONST, 'snaps_health_check'):
- image_custom_config = CONST.__getattribute__('snaps_health_check')
+ image_config = None
+ if hasattr(CONST, 'snaps_images_cirros'):
+ image_config = CONST.__getattribute__('snaps_images_cirros')
# Tests requiring floating IPs leverage files contained within the
# SNAPS repository and are found relative to that path
@@ -47,5 +45,5 @@ class SnapsSmoke(SnapsTestRunner):
ext_net_name=self.ext_net_name,
use_keystone=self.use_keystone,
flavor_metadata=self.flavor_metadata,
- image_metadata=image_custom_config,
+ image_metadata=image_config,
use_floating_ips=self.use_fip)
diff --git a/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py b/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
index 8a68cad9e..94b97551a 100644
--- a/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
+++ b/functest/opnfv_tests/openstack/snaps/snaps_test_runner.py
@@ -5,6 +5,8 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
+import logging
+
from functest.core.pytest_suite_runner import PyTestSuiteRunner
from functest.opnfv_tests.openstack.snaps import snaps_utils
from functest.utils import functest_utils
@@ -16,12 +18,11 @@ from snaps.openstack.tests import openstack_tests
class SnapsTestRunner(PyTestSuiteRunner):
"""
- This test executes the SNAPS Python Test case SimpleHealthCheck which
- creates a VM with a single port with an IPv4 address that is assigned by
- DHCP. This test then validates the expected IP with the actual
+ This test executes the SNAPS Python Tests
"""
def __init__(self, **kwargs):
super(SnapsTestRunner, self).__init__(**kwargs)
+ self.logger = logging.getLogger(__name__)
self.os_creds = openstack_tests.get_credentials(
os_env_file=CONST.__getattribute__('openstack_creds'),
diff --git a/functest/opnfv_tests/openstack/vping/vping_base.py b/functest/opnfv_tests/openstack/vping/vping_base.py
index 8bf263eb2..8e71bf82c 100644
--- a/functest/opnfv_tests/openstack/vping/vping_base.py
+++ b/functest/opnfv_tests/openstack/vping/vping_base.py
@@ -7,281 +7,194 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
+from datetime import datetime
+import logging
import os
-import pprint
import time
-from datetime import datetime
+import uuid
-import functest.core.testcase as testcase
-import functest.utils.openstack_utils as os_utils
+from functest.core.testcase import TestCase
+from functest.utils import functest_utils
from functest.utils.constants import CONST
+from snaps.openstack import create_flavor
+from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
+from snaps.openstack.create_network import NetworkSettings, SubnetSettings
+from snaps.openstack.tests import openstack_tests
+from snaps.openstack.utils import deploy_utils, nova_utils
+
+
+class VPingBase(TestCase):
+
+ """
+ Base class for vPing tests that check connectivity between two VMs shared
+ internal network.
+ This class is responsible for creating the image, internal network.
+ """
-class VPingBase(testcase.TestCase):
def __init__(self, **kwargs):
super(VPingBase, self).__init__(**kwargs)
- self.logger = None
- self.functest_repo = CONST.dir_repo_functest
- self.repo = CONST.dir_vping
- self.vm1_name = CONST.vping_vm_name_1
- self.vm2_name = CONST.vping_vm_name_2
- self.vm_boot_timeout = 180
- self.vm_delete_timeout = 100
- self.ping_timeout = CONST.vping_ping_timeout
-
- self.image_name = CONST.vping_image_name
- self.image_filename = CONST.openstack_image_file_name
- self.image_format = CONST.openstack_image_disk_format
- self.image_username = CONST.openstack_image_username
- self.image_password = CONST.openstack_image_password
- self.image_path = os.path.join(CONST.dir_functest_data,
- self.image_filename)
-
- self.flavor_name = CONST.vping_vm_flavor
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ self.functest_repo = CONST.__getattribute__('dir_repo_functest')
+ self.guid = ''
+ if CONST.__getattribute__('vping_unique_names'):
+ self.guid = '-' + str(uuid.uuid4())
+
+ self.os_creds = openstack_tests.get_credentials(
+ os_env_file=CONST.__getattribute__('openstack_creds'))
+
+ self.repo = CONST.__getattribute__('dir_vping')
+
+ self.creators = list()
+ self.image_creator = None
+ self.network_creator = None
+ self.vm1_creator = None
+ self.vm2_creator = None
+
+ self.self_cleanup = CONST.__getattribute__('vping_cleanup_objects')
+
+ # Image constants
+ self.image_name =\
+ CONST.__getattribute__('vping_image_name') + self.guid
+
+ # VM constants
+ self.vm1_name = CONST.__getattribute__('vping_vm_name_1') + self.guid
+ self.vm2_name = CONST.__getattribute__('vping_vm_name_2') + self.guid
+ self.vm_boot_timeout = CONST.__getattribute__('vping_vm_boot_timeout')
+ self.vm_delete_timeout =\
+ CONST.__getattribute__('vping_vm_delete_timeout')
+ self.vm_ssh_connect_timeout = CONST.vping_vm_ssh_connect_timeout
+ self.ping_timeout = CONST.__getattribute__('vping_ping_timeout')
+ self.flavor_name = 'vping-flavor' + self.guid
# NEUTRON Private Network parameters
- self.private_net_name = CONST.vping_private_net_name
- self.private_subnet_name = CONST.vping_private_subnet_name
- self.private_subnet_cidr = CONST.vping_private_subnet_cidr
- self.router_name = CONST.vping_router_name
- self.sg_name = CONST.vping_sg_name
- self.sg_desc = CONST.vping_sg_desc
- self.neutron_client = os_utils.get_neutron_client()
- self.glance_client = os_utils.get_glance_client()
- self.nova_client = os_utils.get_nova_client()
-
- def run(self, **kwargs):
- if not self.check_repo_exist():
- return testcase.TestCase.EX_RUN_ERROR
-
- image_id = self.create_image()
- if not image_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- flavor = self.get_flavor()
- if not flavor:
- return testcase.TestCase.EX_RUN_ERROR
-
- network_id = self.create_network_full()
- if not network_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- sg_id = self.create_security_group()
- if not sg_id:
- return testcase.TestCase.EX_RUN_ERROR
-
- self.delete_exist_vms()
+ self.private_net_name =\
+ CONST.__getattribute__('vping_private_net_name') + self.guid
+ self.private_subnet_name =\
+ CONST.__getattribute__('vping_private_subnet_name') + self.guid
+ self.private_subnet_cidr =\
+ CONST.__getattribute__('vping_private_subnet_cidr')
- self.start_time = time.time()
- self.logger.info("vPing Start Time:'%s'" % (
- datetime.fromtimestamp(self.start_time).strftime(
- '%Y-%m-%d %H:%M:%S')))
+ scenario = functest_utils.get_scenario()
- vm1 = self.boot_vm(self.vm1_name,
- image_id,
- flavor,
- network_id,
- None,
- sg_id)
- if not vm1:
- return testcase.TestCase.EX_RUN_ERROR
-
- test_ip = self.get_test_ip(vm1)
- vm2 = self.boot_vm(self.vm2_name,
- image_id,
- flavor,
- network_id,
- test_ip,
- sg_id)
- if not vm2:
- return testcase.TestCase.EX_RUN_ERROR
-
- EXIT_CODE = self.do_vping(vm2, test_ip)
- if EXIT_CODE == testcase.TestCase.EX_RUN_ERROR:
- return EXIT_CODE
+ self.flavor_metadata = create_flavor.MEM_PAGE_SIZE_ANY
+ if 'ovs' in scenario or 'fdio' in scenario:
+ self.flavor_metadata = create_flavor.MEM_PAGE_SIZE_LARGE
- self.stop_time = time.time()
- self.parse_result(EXIT_CODE,
- self.start_time,
- self.stop_time)
- return testcase.TestCase.EX_OK
+ self.cirros_image_config = None
- def boot_vm_preparation(self, config, vmname, test_ip):
- pass
+ # Move this configuration option up for all tests to leverage
+ if hasattr(CONST, 'snaps_images_cirros'):
+ self.cirros_image_config = CONST.__getattribute__(
+ 'snaps_images_cirros')
- def do_vping(self, vm, test_ip):
- raise NotImplementedError('vping execution is not implemented')
+ def run(self):
+ """
+ Begins the test execution which should originate from the subclass
+ """
- def check_repo_exist(self):
if not os.path.exists(self.functest_repo):
- self.logger.error("Functest repository not found '%s'"
- % self.functest_repo)
- return False
- return True
+ raise Exception(
+ "Functest repository not found '%s'" % self.functest_repo)
- def create_image(self):
- _, image_id = os_utils.get_or_create_image(self.image_name,
- self.image_path,
- self.image_format)
- if not image_id:
- return None
+ self.logger.info('Begin virtual environment setup')
- return image_id
+ self.start_time = time.time()
+ self.logger.info("vPing Start Time:'%s'" % (
+ datetime.fromtimestamp(self.start_time).strftime(
+ '%Y-%m-%d %H:%M:%S')))
- def get_flavor(self):
- try:
- flavor = self.nova_client.flavors.find(name=self.flavor_name)
- self.logger.info("Using existing Flavor '%s'..."
- % self.flavor_name)
- return flavor
- except:
- self.logger.error("Flavor '%s' not found." % self.flavor_name)
- self.logger.info("Available flavors are: ")
- self.pMsg(self.nova_client.flavor.list())
- return None
-
- def create_network_full(self):
- network_dic = os_utils.create_network_full(self.neutron_client,
- self.private_net_name,
- self.private_subnet_name,
- self.router_name,
- self.private_subnet_cidr)
-
- if not network_dic:
- self.logger.error(
- "There has been a problem when creating the neutron network")
- return None
- network_id = network_dic["net_id"]
- return network_id
-
- def create_security_group(self):
- sg_id = os_utils.get_security_group_id(self.neutron_client,
- self.sg_name)
- if sg_id != '':
- self.logger.info("Using existing security group '%s'..."
- % self.sg_name)
+ self.__delete_exist_vms()
+
+ image_base_name = self.image_name + '-' + str(self.guid)
+ os_image_settings = openstack_tests.cirros_image_settings(
+ image_base_name, image_metadata=self.cirros_image_config)
+ self.logger.info("Creating image with name: '%s'" % self.image_name)
+
+ self.image_creator = deploy_utils.create_image(
+ self.os_creds, os_image_settings)
+ self.creators.append(self.image_creator)
+
+ self.logger.info(
+ "Creating network with name: '%s'" % self.private_net_name)
+ self.network_creator = deploy_utils.create_network(
+ self.os_creds,
+ NetworkSettings(name=self.private_net_name,
+ subnet_settings=[SubnetSettings(
+ name=self.private_subnet_name,
+ cidr=self.private_subnet_cidr)]))
+ self.creators.append(self.network_creator)
+
+ self.logger.info(
+ "Creating flavor with name: '%s'" % self.flavor_name)
+ flavor_creator = OpenStackFlavor(
+ self.os_creds,
+ FlavorSettings(name=self.flavor_name, ram=512, disk=1, vcpus=1,
+ metadata=self.flavor_metadata))
+ flavor_creator.create()
+ self.creators.append(flavor_creator)
+
+ def _execute(self):
+ """
+ Method called by subclasses after environment has been setup
+ :return: the exit code
+ """
+ self.logger.info('Begin test execution')
+
+ test_ip = self.vm1_creator.get_port_ip(
+ self.vm1_creator.instance_settings.port_settings[0].name)
+
+ if self.vm1_creator.vm_active(
+ block=True) and self.vm2_creator.vm_active(block=True):
+ result = self._do_vping(self.vm2_creator, test_ip)
else:
- self.logger.info("Creating security group '%s'..."
- % self.sg_name)
- SECGROUP = os_utils.create_security_group(self.neutron_client,
- self.sg_name,
- self.sg_desc)
- if not SECGROUP:
- self.logger.error("Failed to create the security group...")
- return None
-
- sg_id = SECGROUP['id']
-
- self.logger.debug("Security group '%s' with ID=%s created "
- "successfully." % (SECGROUP['name'], sg_id))
-
- self.logger.debug("Adding ICMP rules in security group '%s'..."
- % self.sg_name)
- if not os_utils.create_secgroup_rule(self.neutron_client, sg_id,
- 'ingress', 'icmp'):
- self.logger.error("Failed to create security group rule...")
- return None
-
- self.logger.debug("Adding SSH rules in security group '%s'..."
- % self.sg_name)
- if not os_utils.create_secgroup_rule(self.neutron_client, sg_id,
- 'ingress', 'tcp',
- '22', '22'):
- self.logger.error("Failed to create security group rule...")
- return None
-
- if not os_utils.create_secgroup_rule(
- self.neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
- self.logger.error("Failed to create security group rule...")
- return None
- return sg_id
-
- def delete_exist_vms(self):
- servers = self.nova_client.servers.list()
+ raise Exception('VMs never became active')
+
+ if result == TestCase.EX_RUN_ERROR:
+ return TestCase.EX_RUN_ERROR
+
+ self.stop_time = time.time()
+ self.result = 100
+ return TestCase.EX_OK
+
+ def _cleanup(self):
+ """
+ Cleanup all OpenStack objects. Should be called on completion
+ :return:
+ """
+ if self.self_cleanup:
+ for creator in reversed(self.creators):
+ try:
+ creator.clean()
+ except Exception as e:
+ self.logger.error('Unexpected error cleaning - %s', e)
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Method to be implemented by subclasses
+ Begins the real test after the OpenStack environment has been setup
+ :param vm_creator: the SNAPS VM instance creator object
+ :param test_ip: the IP to which the VM needs to issue the ping
+ :return: T/F
+ """
+ raise NotImplementedError('vping execution is not implemented')
+
+ def __delete_exist_vms(self):
+ """
+ Cleans any existing VMs using the same name.
+ """
+ nova_client = nova_utils.nova_client(self.os_creds)
+ servers = nova_client.servers.list()
for server in servers:
if server.name == self.vm1_name or server.name == self.vm2_name:
self.logger.info("Deleting instance %s..." % server.name)
server.delete()
- def boot_vm(self, vmname, image_id, flavor, network_id, test_ip, sg_id):
- config = dict()
- config['name'] = vmname
- config['flavor'] = flavor
- config['image'] = image_id
- config['nics'] = [{"net-id": network_id}]
- self.boot_vm_preparation(config, vmname, test_ip)
- self.logger.info("Creating instance '%s'..." % vmname)
- self.logger.debug("Configuration: %s" % config)
- vm = self.nova_client.servers.create(**config)
-
- # wait until VM status is active
- if not self.waitVmActive(self.nova_client, vm):
- vm_status = os_utils.get_instance_status(self.nova_client, vm)
- self.logger.error("Instance '%s' cannot be booted. Status is '%s'"
- % (vmname, vm_status))
- return None
- else:
- self.logger.info("Instance '%s' is ACTIVE." % vmname)
-
- self.add_secgroup(vmname, vm.id, sg_id)
-
- return vm
-
- def waitVmActive(self, nova, vm):
- # sleep and wait for VM status change
- sleep_time = 3
- count = self.vm_boot_timeout / sleep_time
- while True:
- status = os_utils.get_instance_status(nova, vm)
- self.logger.debug("Status: %s" % status)
- if status == "ACTIVE":
- return True
- if status == "ERROR" or status == "error":
- return False
- if count == 0:
- self.logger.debug("Booting a VM timed out...")
- return False
- count -= 1
- time.sleep(sleep_time)
- return False
-
- def add_secgroup(self, vmname, vm_id, sg_id):
- self.logger.info("Adding '%s' to security group '%s'..." %
- (vmname, self.sg_name))
- os_utils.add_secgroup_to_instance(self.nova_client, vm_id, sg_id)
-
- def get_test_ip(self, vm):
- test_ip = vm.networks.get(self.private_net_name)[0]
- self.logger.debug("Instance '%s' got %s" % (vm.name, test_ip))
- return test_ip
-
- def parse_result(self, code, start_time, stop_time):
- test_status = "FAIL"
- if code == 0:
- self.logger.info("vPing OK")
- duration = round(stop_time - start_time, 1)
- self.logger.info("vPing duration:'%s'" % duration)
- test_status = "PASS"
- elif code == -2:
- duration = 0
- self.logger.info("Userdata is not supported in nova boot. "
- "Aborting test...")
- else:
- duration = 0
- self.logger.error("vPing FAILED")
-
- self.details = {'timestart': start_time,
- 'duration': duration,
- 'status': test_status}
- self.result = test_status
-
- @staticmethod
- def pMsg(msg):
- """pretty printing"""
- pprint.PrettyPrinter(indent=4).pprint(msg)
-
class VPingMain(object):
+
def __init__(self, vping_cls):
self.vping = vping_cls()
@@ -291,6 +204,6 @@ class VPingMain(object):
if result != VPingBase.EX_OK:
return result
if kwargs['report']:
- return self.vping.push_to_db()
- except Exception:
+ return self.vping.publish_report()
+ except:
return VPingBase.EX_RUN_ERROR
diff --git a/functest/opnfv_tests/openstack/vping/vping_ssh.py b/functest/opnfv_tests/openstack/vping/vping_ssh.py
index e87da3636..0ad77402e 100755
--- a/functest/opnfv_tests/openstack/vping/vping_ssh.py
+++ b/functest/opnfv_tests/openstack/vping/vping_ssh.py
@@ -7,124 +7,161 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import logging
+import argparse
import os
-import re
+from scp import SCPClient
import sys
import time
-import argparse
-import paramiko
-from scp import SCPClient
-
-import functest.utils.openstack_utils as os_utils
+from snaps.openstack.create_instance import FloatingIpSettings, \
+ VmInstanceSettings
+from snaps.openstack.create_keypairs import KeypairSettings
+from snaps.openstack.create_network import PortSettings
+from snaps.openstack.create_router import RouterSettings
+from snaps.openstack.create_security_group import Direction, Protocol, \
+ SecurityGroupSettings, SecurityGroupRuleSettings
+from snaps.openstack.utils import deploy_utils
+
+from functest.core.testcase import TestCase
+from functest.opnfv_tests.openstack.snaps import snaps_utils
+from functest.utils.constants import CONST
import vping_base
-import functest.core.testcase as testcase
class VPingSSH(vping_base.VPingBase):
+ """
+ Class to execute the vPing test using a Floating IP to connect to one VM
+ to issue the ping command to the second
+ """
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_ssh"
super(VPingSSH, self).__init__(**kwargs)
- self.logger = logging.getLogger(__name__)
-
- def do_vping(self, vm, test_ip):
- floatip = self.add_float_ip(vm)
- if not floatip:
- return testcase.TestCase.EX_RUN_ERROR
- ssh = self.establish_ssh(vm, floatip)
- if not ssh:
- return testcase.TestCase.EX_RUN_ERROR
- if not self.transfer_ping_script(ssh, floatip):
- return testcase.TestCase.EX_RUN_ERROR
- return self.do_vping_ssh(ssh, test_ip)
-
- def add_float_ip(self, vm):
- self.logger.info("Creating floating IP for VM '%s'..." % self.vm2_name)
- floatip_dic = os_utils.create_floating_ip(self.neutron_client)
- floatip = floatip_dic['fip_addr']
-
- if floatip is None:
- self.logger.error("Cannot create floating IP.")
- return None
- self.logger.info("Floating IP created: '%s'" % floatip)
-
- self.logger.info("Associating floating ip: '%s' to VM '%s' "
- % (floatip, self.vm2_name))
- if not os_utils.add_floating_ip(self.nova_client, vm.id, floatip):
- self.logger.error("Cannot associate floating IP to VM.")
- return None
-
- return floatip
-
- def establish_ssh(self, vm, floatip):
- self.logger.info("Trying to establish SSH connection to %s..."
- % floatip)
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- timeout = 50
- nolease = False
- got_ip = False
- discover_count = 0
- cidr_first_octet = self.private_subnet_cidr.split('.')[0]
- while timeout > 0:
- try:
- ssh.connect(floatip, username=self.image_username,
- password=self.image_password, timeout=2)
- self.logger.debug("SSH connection established to %s."
- % floatip)
- break
- except:
- self.logger.debug("Waiting for %s..." % floatip)
- time.sleep(6)
- timeout -= 1
-
- console_log = vm.get_console_output()
-
- # print each "Sending discover" captured on the console log
- if (len(re.findall("Sending discover", console_log)) >
- discover_count and not got_ip):
- discover_count += 1
- self.logger.debug("Console-log '%s': Sending discover..."
- % self.vm2_name)
-
- # check if eth0 got an ip,the line looks like this:
- # "inet addr:192.168."....
- # if the dhcp agent fails to assing ip, this line will not appear
- if "inet addr:" + cidr_first_octet in console_log and not got_ip:
- got_ip = True
- self.logger.debug("The instance '%s' succeeded to get the IP "
- "from the dhcp agent." % self.vm2_name)
-
- # if dhcp not work,it shows "No lease, failing".The test will fail
- if ("No lease, failing" in console_log and
- not nolease and
- not got_ip):
- nolease = True
- self.logger.debug("Console-log '%s': No lease, failing..."
- % self.vm2_name)
- self.logger.info("The instance failed to get an IP from DHCP "
- "agent. The test will probably timeout...")
-
- if timeout == 0: # 300 sec timeout (5 min)
- self.logger.error("Cannot establish connection to IP '%s'. "
- "Aborting" % floatip)
- return None
- return ssh
-
- def transfer_ping_script(self, ssh, floatip):
- self.logger.info("Trying to transfer ping.sh to %s..." % floatip)
+
+ self.kp_name = CONST.__getattribute__('vping_keypair_name') + self.guid
+ self.kp_priv_file = CONST.__getattribute__('vping_keypair_priv_file')
+ self.kp_pub_file = CONST.__getattribute__('vping_keypair_pub_file')
+ self.router_name =\
+ CONST.__getattribute__('vping_router_name') + self.guid
+ self.sg_name = CONST.__getattribute__('vping_sg_name') + self.guid
+ self.sg_desc = CONST.__getattribute__('vping_sg_desc')
+
+ self.ext_net_name = snaps_utils.get_ext_net_name(self.os_creds)
+
+ def run(self):
+ """
+ Sets up the OpenStack keypair, router, security group, and VM instance
+ objects then validates the ping.
+ :return: the exit code from the super.execute() method
+ """
+ try:
+ super(VPingSSH, self).run()
+
+ self.logger.info("Creating keypair with name: '%s'" % self.kp_name)
+ kp_creator = deploy_utils.create_keypair(
+ self.os_creds,
+ KeypairSettings(name=self.kp_name,
+ private_filepath=self.kp_priv_file,
+ public_filepath=self.kp_pub_file))
+ self.creators.append(kp_creator)
+
+ # Creating router to external network
+ self.logger.info("Creating router with name: '%s'"
+ % self.router_name)
+ net_set = self.network_creator.network_settings
+ sub_set = [net_set.subnet_settings[0].name]
+ router_creator = deploy_utils.create_router(
+ self.os_creds,
+ RouterSettings(
+ name=self.router_name,
+ external_gateway=self.ext_net_name,
+ internal_subnets=sub_set))
+ self.creators.append(router_creator)
+
+ # Creating Instance 1
+ port1_settings = PortSettings(
+ name=self.vm1_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance1_settings = VmInstanceSettings(
+ name=self.vm1_name, flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ vm_delete_timeout=self.vm_delete_timeout,
+ ssh_connect_timeout=self.vm_ssh_connect_timeout,
+ port_settings=[port1_settings])
+
+ self.logger.info(
+ "Creating VM 1 instance with name: '%s'"
+ % instance1_settings.name)
+ self.vm1_creator = deploy_utils.create_vm_instance(
+ self.os_creds,
+ instance1_settings,
+ self.image_creator.image_settings,
+ keypair_creator=kp_creator)
+ self.creators.append(self.vm1_creator)
+
+ # Creating Instance 2
+ sg_creator = self.__create_security_group()
+ self.creators.append(sg_creator)
+
+ port2_settings = PortSettings(
+ name=self.vm2_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance2_settings = VmInstanceSettings(
+ name=self.vm2_name, flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ vm_delete_timeout=self.vm_delete_timeout,
+ ssh_connect_timeout=self.vm_ssh_connect_timeout,
+ port_settings=[port2_settings],
+ security_group_names=[sg_creator.sec_grp_settings.name],
+ floating_ip_settings=[FloatingIpSettings(
+ name=self.vm2_name + '-FIPName',
+ port_name=port2_settings.name,
+ router_name=router_creator.router_settings.name)])
+
+ self.logger.info(
+ "Creating VM 2 instance with name: '%s'"
+ % instance2_settings.name)
+ self.vm2_creator = deploy_utils.create_vm_instance(
+ self.os_creds,
+ instance2_settings,
+ self.image_creator.image_settings,
+ keypair_creator=kp_creator)
+ self.creators.append(self.vm2_creator)
+
+ return self._execute()
+ except Exception as e:
+ self.logger.error('Unexpected error running test - ' + e.message)
+ return TestCase.EX_RUN_ERROR
+ finally:
+ self._cleanup()
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Override from super
+ """
+ if vm_creator.vm_ssh_active(block=True):
+ ssh = vm_creator.ssh_client()
+ if not self.__transfer_ping_script(ssh):
+ return TestCase.EX_RUN_ERROR
+ return self.__do_vping_ssh(ssh, test_ip)
+ else:
+ return -1
+
+ def __transfer_ping_script(self, ssh):
+ """
+ Uses SCP to copy the ping script via the SSH client
+ :param ssh: the SSH client
+ :return:
+ """
+ self.logger.info("Trying to transfer ping.sh")
scp = SCPClient(ssh.get_transport())
local_path = self.functest_repo + "/" + self.repo
ping_script = os.path.join(local_path, "ping.sh")
try:
scp.put(ping_script, "~/")
except:
- self.logger.error("Cannot SCP the file '%s' to VM '%s'"
- % (ping_script, floatip))
+ self.logger.error("Cannot SCP the file '%s'" % ping_script)
return False
cmd = 'chmod 755 ~/ping.sh'
@@ -134,8 +171,14 @@ class VPingSSH(vping_base.VPingBase):
return True
- def do_vping_ssh(self, ssh, test_ip):
- EXIT_CODE = -1
+ def __do_vping_ssh(self, ssh, test_ip):
+ """
+ Pings the test_ip via the SSH client
+ :param ssh: the SSH client used to issue the ping command
+ :param test_ip: the IP for the ping command to use
+ :return: exit_code (int)
+ """
+ exit_code = TestCase.EX_TESTCASE_FAILED
self.logger.info("Waiting for ping...")
sec = 0
@@ -150,7 +193,7 @@ class VPingSSH(vping_base.VPingBase):
for line in output:
if "vPing OK" in line:
self.logger.info("vPing detected!")
- EXIT_CODE = 0
+ exit_code = TestCase.EX_OK
flag = True
break
@@ -162,11 +205,38 @@ class VPingSSH(vping_base.VPingBase):
break
self.logger.debug("Pinging %s. Waiting for response..." % test_ip)
sec += 1
- return EXIT_CODE
+ return exit_code
+
+ def __create_security_group(self):
+ """
+ Configures and deploys an OpenStack security group object
+ :return: the creator object
+ """
+ sg_rules = list()
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.ingress,
+ protocol=Protocol.icmp))
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22,
+ port_range_max=22))
+ sg_rules.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sg_name,
+ direction=Direction.egress,
+ protocol=Protocol.tcp, port_range_min=22,
+ port_range_max=22))
+
+ self.logger.info("Security group with name: '%s'" % self.sg_name)
+ return deploy_utils.create_security_group(self.os_creds,
+ SecurityGroupSettings(
+ name=self.sg_name,
+ description=self.sg_desc,
+ rule_settings=sg_rules))
if __name__ == '__main__':
- logging.basicConfig()
args_parser = argparse.ArgumentParser()
args_parser.add_argument("-r", "--report",
help="Create json result file",
diff --git a/functest/opnfv_tests/openstack/vping/vping_userdata.py b/functest/opnfv_tests/openstack/vping/vping_userdata.py
index 05dda9dea..8ea9be84b 100755
--- a/functest/opnfv_tests/openstack/vping/vping_userdata.py
+++ b/functest/opnfv_tests/openstack/vping/vping_userdata.py
@@ -7,74 +7,144 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
-import logging
+import argparse
import sys
import time
-import argparse
+from functest.core.testcase import TestCase
+
+from snaps.openstack.utils import deploy_utils
+from snaps.openstack.create_instance import VmInstanceSettings
+from snaps.openstack.create_network import PortSettings
import vping_base
class VPingUserdata(vping_base.VPingBase):
+ """
+ Class to execute the vPing test using userdata and the VM's console
+ """
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_userdata"
super(VPingUserdata, self).__init__(**kwargs)
- self.logger = logging.getLogger(__name__)
-
- def boot_vm_preparation(self, config, vmname, test_ip):
- config['config_drive'] = True
- if vmname == self.vm2_name:
- u = ("#!/bin/sh\n\n"
- "while true; do\n"
- " ping -c 1 %s 2>&1 >/dev/null\n"
- " RES=$?\n"
- " if [ \"Z$RES\" = \"Z0\" ] ; then\n"
- " echo 'vPing OK'\n"
- " break\n"
- " else\n"
- " echo 'vPing KO'\n"
- " fi\n"
- " sleep 1\n"
- "done\n" % test_ip)
- config['userdata'] = u
-
- def do_vping(self, vm, test_ip):
+
+ def run(self):
+ """
+ Sets up the OpenStack VM instance objects then executes the ping and
+ validates.
+ :return: the exit code from the super.execute() method
+ """
+ try:
+ super(VPingUserdata, self).run()
+
+ # Creating Instance 1
+ port1_settings = PortSettings(
+ name=self.vm1_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance1_settings = VmInstanceSettings(
+ name=self.vm1_name,
+ flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ port_settings=[port1_settings])
+
+ self.logger.info(
+ "Creating VM 1 instance with name: '%s'"
+ % instance1_settings.name)
+ self.vm1_creator = deploy_utils.create_vm_instance(
+ self.os_creds, instance1_settings,
+ self.image_creator.image_settings)
+ self.creators.append(self.vm1_creator)
+
+ userdata = _get_userdata(
+ self.vm1_creator.get_port_ip(port1_settings.name))
+ if userdata:
+ # Creating Instance 2
+ port2_settings = PortSettings(
+ name=self.vm2_name + '-vPingPort',
+ network_name=self.network_creator.network_settings.name)
+ instance2_settings = VmInstanceSettings(
+ name=self.vm2_name,
+ flavor=self.flavor_name,
+ vm_boot_timeout=self.vm_boot_timeout,
+ port_settings=[port2_settings],
+ userdata=userdata)
+
+ self.logger.info(
+ "Creating VM 2 instance with name: '%s'"
+ % instance2_settings.name)
+ self.vm2_creator = deploy_utils.create_vm_instance(
+ self.os_creds, instance2_settings,
+ self.image_creator.image_settings)
+ self.creators.append(self.vm2_creator)
+ else:
+ raise Exception('Userdata is None')
+
+ return self._execute()
+
+ finally:
+ self._cleanup()
+
+ def _do_vping(self, vm_creator, test_ip):
+ """
+ Override from super
+ """
self.logger.info("Waiting for ping...")
- EXIT_CODE = -1
+ exit_code = -1
sec = 0
tries = 0
while True:
time.sleep(1)
- p_console = vm.get_console_output()
+ p_console = vm_creator.get_vm_inst().get_console_output()
if "vPing OK" in p_console:
self.logger.info("vPing detected!")
- EXIT_CODE = 0
+ exit_code = TestCase.EX_OK
break
elif "failed to read iid from metadata" in p_console or tries > 5:
- EXIT_CODE = -2
+ exit_code = TestCase.EX_TESTCASE_FAILED
break
elif sec == self.ping_timeout:
self.logger.info("Timeout reached.")
break
elif sec % 10 == 0:
if "request failed" in p_console:
- self.logger.debug("It seems userdata is not supported "
- "in nova boot. Waiting a bit...")
+ self.logger.debug(
+ "It seems userdata is not supported in nova boot. " +
+ "Waiting a bit...")
tries += 1
else:
- self.logger.debug("Pinging %s. Waiting for response..."
- % test_ip)
+ self.logger.debug(
+ "Pinging %s. Waiting for response..." % test_ip)
sec += 1
- return EXIT_CODE
+ return exit_code
+
+
+def _get_userdata(test_ip):
+ """
+ Returns the post VM creation script to be added into the VM's userdata
+ :param test_ip: the IP value to substitute into the script
+ :return: the bash script contents
+ """
+ if test_ip:
+ return ("#!/bin/sh\n\n"
+ "while true; do\n"
+ " ping -c 1 %s 2>&1 >/dev/null\n"
+ " RES=$?\n"
+ " if [ \"Z$RES\" = \"Z0\" ] ; then\n"
+ " echo 'vPing OK'\n"
+ " break\n"
+ " else\n"
+ " echo 'vPing KO'\n"
+ " fi\n"
+ " sleep 1\n"
+ "done\n" % test_ip)
+ return None
if __name__ == '__main__':
- logging.basicConfig()
args_parser = argparse.ArgumentParser()
args_parser.add_argument("-r", "--report",
help="Create json result file",
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index f92cb95da..6f586b7a0 100755
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -16,6 +16,8 @@ Example:
$ python odl.py
"""
+from __future__ import division
+
import argparse
import errno
import fileinput
@@ -23,12 +25,12 @@ import logging
import os
import re
import sys
-import urlparse
import robot.api
from robot.errors import RobotError
import robot.run
from robot.utils.robottime import timestamp_to_secs
+from six.moves import urllib
from functest.core import testcase
import functest.utils.openstack_utils as op_utils
@@ -85,10 +87,10 @@ class ODLTests(testcase.TestCase):
try:
for line in fileinput.input(odl_variables_files,
inplace=True):
- print re.sub("AUTH = .*",
+ print(re.sub("AUTH = .*",
("AUTH = [u'" + odlusername + "', u'" +
odlpassword + "']"),
- line.rstrip())
+ line.rstrip()))
return True
except Exception as ex: # pylint: disable=broad-except
cls.__logger.error("Cannot set ODL creds: %s", str(ex))
@@ -100,7 +102,12 @@ class ODLTests(testcase.TestCase):
result = robot.api.ExecutionResult(xml_file)
visitor = ODLResultVisitor()
result.visit(visitor)
- self.result = result.suite.status
+ try:
+ self.result = 100 * (
+ result.suite.statistics.critical.passed /
+ result.suite.statistics.critical.total)
+ except ZeroDivisionError:
+ self.__logger.error("No test has been ran")
self.start_time = timestamp_to_secs(result.suite.starttime)
self.stop_time = timestamp_to_secs(result.suite.endtime)
self.details = {}
@@ -143,7 +150,7 @@ class ODLTests(testcase.TestCase):
odlusername = kwargs['odlusername']
odlpassword = kwargs['odlpassword']
osauthurl = kwargs['osauthurl']
- keystoneip = urlparse.urlparse(osauthurl).hostname
+ keystoneip = urllib.parse.urlparse(osauthurl).hostname
variables = ['KEYSTONE:' + keystoneip,
'NEUTRON:' + kwargs['neutronip'],
'OS_AUTH_URL:"' + osauthurl + '"',
@@ -211,7 +218,7 @@ class ODLTests(testcase.TestCase):
except KeyError:
pass
neutron_url = op_utils.get_endpoint(service_type='network')
- kwargs = {'neutronip': urlparse.urlparse(neutron_url).hostname}
+ kwargs = {'neutronip': urllib.parse.urlparse(neutron_url).hostname}
kwargs['odlip'] = kwargs['neutronip']
kwargs['odlwebport'] = '8080'
kwargs['odlrestconfport'] = '8181'
diff --git a/functest/opnfv_tests/vnf/aaa/aaa.py b/functest/opnfv_tests/vnf/aaa/aaa.py
index 1a484ddfb..0030256c2 100755
--- a/functest/opnfv_tests/vnf/aaa/aaa.py
+++ b/functest/opnfv_tests/vnf/aaa/aaa.py
@@ -13,10 +13,10 @@ import sys
import argparse
import functest.core.testcase as testcase
-import functest.core.vnf_base as vnf_base
+import functest.core.vnf as vnf
-class AaaVnf(vnf_base.VnfOnBoardingBase):
+class AaaVnf(vnf.VnfOnBoarding):
logger = logging.getLogger(__name__)
diff --git a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
index f3bb30126..42d31e3bb 100644
--- a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
+++ b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py
@@ -13,12 +13,12 @@ import shutil
import requests
-import functest.core.vnf_base as vnf_base
+import functest.core.vnf as vnf
from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
-class ClearwaterOnBoardingBase(vnf_base.VnfOnBoardingBase):
+class ClearwaterOnBoardingBase(vnf.VnfOnBoarding):
def __init__(self, **kwargs):
self.logger = logging.getLogger(__name__)
diff --git a/functest/opnfv_tests/vnf/ims/orchestra_ims.py b/functest/opnfv_tests/vnf/ims/orchestra_ims.py
index 95751d47f..6f341970d 100755
--- a/functest/opnfv_tests/vnf/ims/orchestra_ims.py
+++ b/functest/opnfv_tests/vnf/ims/orchestra_ims.py
@@ -9,15 +9,15 @@
import json
import logging
+import os
import socket
import sys
import time
import yaml
-import functest.core.vnf_base as vnf_base
+import functest.core.vnf as vnf
import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
-import os
from functest.utils.constants import CONST
from org.openbaton.cli.agents.agents import MainAgent
@@ -76,7 +76,7 @@ def servertest(host, port):
return True
-class ImsVnf(vnf_base.VnfOnBoardingBase):
+class ImsVnf(vnf.VnfOnBoarding):
def __init__(self, project='functest', case_name='orchestra_ims',
repo='', cmd=''):
diff --git a/functest/tests/unit/ci/test_generate_report.py b/functest/tests/unit/ci/test_generate_report.py
deleted file mode 100644
index 13361c1d1..000000000
--- a/functest/tests/unit/ci/test_generate_report.py
+++ /dev/null
@@ -1,129 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-import logging
-import unittest
-import urllib2
-
-import mock
-
-from functest.ci import generate_report as gen_report
-from functest.tests.unit import test_utils
-from functest.utils import functest_utils as ft_utils
-from functest.utils.constants import CONST
-
-
-class GenerateReportTesting(unittest.TestCase):
-
- logging.disable(logging.CRITICAL)
-
- def test_init(self):
- test_array = gen_report.init()
- self.assertEqual(test_array, [])
-
- @mock.patch('functest.ci.generate_report.urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
- def test_get_results_from_db_fail(self, mock_method):
- url = "%s/results?build_tag=%s" % (ft_utils.get_db_url(),
- CONST.__getattribute__('BUILD_TAG'))
- self.assertIsNone(gen_report.get_results_from_db())
- mock_method.assert_called_once_with(url)
-
- @mock.patch('functest.ci.generate_report.urllib2.urlopen',
- return_value={'results': []})
- def test_get_results_from_db_success(self, mock_method):
- url = "%s/results?build_tag=%s" % (ft_utils.get_db_url(),
- CONST.__getattribute__('BUILD_TAG'))
- self.assertEqual(gen_report.get_results_from_db(), None)
- mock_method.assert_called_once_with(url)
-
- def test_get_data(self):
- self.assertIsInstance(gen_report.get_data({'result': ''}, ''), dict)
-
- def test_print_line_with_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', True)
- w1 = 'test_print_line'
- test_str = ("| %s| %s| %s| %s| %s|\n"
- % (w1.ljust(gen_report.COL_1_LEN - 1),
- ''.ljust(gen_report.COL_2_LEN - 1),
- ''.ljust(gen_report.COL_3_LEN - 1),
- ''.ljust(gen_report.COL_4_LEN - 1),
- ''.ljust(gen_report.COL_5_LEN - 1)))
- self.assertEqual(gen_report.print_line(w1), test_str)
-
- def test_print_line_without_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', False)
- w1 = 'test_print_line'
- test_str = ("| %s| %s| %s| %s|\n"
- % (w1.ljust(gen_report.COL_1_LEN - 1),
- ''.ljust(gen_report.COL_2_LEN - 1),
- ''.ljust(gen_report.COL_3_LEN - 1),
- ''.ljust(gen_report.COL_4_LEN - 1)))
- self.assertEqual(gen_report.print_line(w1), test_str)
-
- def test_print_line_no_column_with_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', True)
- TOTAL_LEN = gen_report.COL_1_LEN + gen_report.COL_2_LEN
- TOTAL_LEN += gen_report.COL_3_LEN + gen_report.COL_4_LEN + 2
- TOTAL_LEN += gen_report.COL_5_LEN + 1
- test_str = ("| %s|\n" % 'test'.ljust(TOTAL_LEN))
- self.assertEqual(gen_report.print_line_no_columns('test'), test_str)
-
- def test_print_line_no_column_without_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', False)
- TOTAL_LEN = gen_report.COL_1_LEN + gen_report.COL_2_LEN
- TOTAL_LEN += gen_report.COL_3_LEN + gen_report.COL_4_LEN + 2
- test_str = ("| %s|\n" % 'test'.ljust(TOTAL_LEN))
- self.assertEqual(gen_report.print_line_no_columns('test'), test_str)
-
- def test_print_separator_with_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', True)
- test_str = ("+" + "=" * gen_report.COL_1_LEN +
- "+" + "=" * gen_report.COL_2_LEN +
- "+" + "=" * gen_report.COL_3_LEN +
- "+" + "=" * gen_report.COL_4_LEN +
- "+" + "=" * gen_report.COL_5_LEN)
- test_str += '+\n'
- self.assertEqual(gen_report.print_separator(), test_str)
-
- def test_print_separator_without_ci_run(self):
- CONST.__setattr__('IS_CI_RUN', False)
- test_str = ("+" + "=" * gen_report.COL_1_LEN +
- "+" + "=" * gen_report.COL_2_LEN +
- "+" + "=" * gen_report.COL_3_LEN +
- "+" + "=" * gen_report.COL_4_LEN)
- test_str += "+\n"
- self.assertEqual(gen_report.print_separator(), test_str)
-
- @mock.patch('functest.ci.generate_report.logger.info')
- def test_main_with_ci_run(self, mock_method):
- CONST.__setattr__('IS_CI_RUN', True)
- gen_report.main()
- mock_method.assert_called_once_with(test_utils.SubstrMatch('URL'))
-
- @mock.patch('functest.ci.generate_report.logger.info')
- def test_main_with_ci_loop(self, mock_method):
- CONST.__setattr__('CI_LOOP', 'daily')
- gen_report.main()
- mock_method.assert_called_once_with(test_utils.SubstrMatch('CI LOOP'))
-
- @mock.patch('functest.ci.generate_report.logger.info')
- def test_main_with_scenario(self, mock_method):
- CONST.__setattr__('DEPLOY_SCENARIO', 'test_scenario')
- gen_report.main()
- mock_method.assert_called_once_with(test_utils.SubstrMatch('SCENARIO'))
-
- @mock.patch('functest.ci.generate_report.logger.info')
- def test_main_with_build_tag(self, mock_method):
- CONST.__setattr__('BUILD_TAG', 'test_build_tag')
- gen_report.main()
- mock_method.assert_called_once_with(test_utils.
- SubstrMatch('BUILD TAG'))
-
-
-if __name__ == "__main__":
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py
index ef08282aa..d0052392f 100644
--- a/functest/tests/unit/ci/test_run_tests.py
+++ b/functest/tests/unit/ci/test_run_tests.py
@@ -70,17 +70,6 @@ class RunTestsTesting(unittest.TestCase):
run_tests.cleanup()
self.assertTrue(mock_os_clean.called)
- def test_update_test_info(self):
- run_tests.GlobalVariables.EXECUTED_TEST_CASES = [self.test]
- run_tests.update_test_info('test_name',
- 'test_result',
- 'test_duration')
- exp = self.test
- exp.update({"result": 'test_result',
- "duration": 'test_duration'})
- self.assertEqual(run_tests.GlobalVariables.EXECUTED_TEST_CASES,
- [exp])
-
def test_get_run_dict_if_defined_default(self):
mock_obj = mock.Mock()
with mock.patch('functest.ci.run_tests.'
@@ -148,10 +137,8 @@ class RunTestsTesting(unittest.TestCase):
mock.patch('functest.ci.run_tests.source_rc_file'), \
mock.patch('functest.ci.run_tests.generate_os_snapshot'), \
mock.patch('functest.ci.run_tests.cleanup'), \
- mock.patch('functest.ci.run_tests.update_test_info'), \
mock.patch('functest.ci.run_tests.get_run_dict',
return_value=test_run_dict), \
- mock.patch('functest.ci.run_tests.generate_report.main'), \
self.assertRaises(run_tests.BlockingTestFailed) as context:
run_tests.GlobalVariables.CLEAN_FLAG = True
run_tests.run_test(mock_test, 'tier_name')
@@ -176,21 +163,17 @@ class RunTestsTesting(unittest.TestCase):
@mock.patch('functest.ci.run_tests.logger.info')
def test_run_all_default(self, mock_logger_info):
- with mock.patch('functest.ci.run_tests.run_tier') as mock_method, \
- mock.patch('functest.ci.run_tests.generate_report.init'), \
- mock.patch('functest.ci.run_tests.generate_report.main'):
+ with mock.patch('functest.ci.run_tests.run_tier') as mock_method:
CONST.__setattr__('CI_LOOP', 'test_ci_loop')
run_tests.run_all(self.tiers)
mock_method.assert_any_call(self.tier)
self.assertTrue(mock_logger_info.called)
@mock.patch('functest.ci.run_tests.logger.info')
- def test_run_all__missing_tier(self, mock_logger_info):
- with mock.patch('functest.ci.run_tests.generate_report.init'), \
- mock.patch('functest.ci.run_tests.generate_report.main'):
- CONST.__setattr__('CI_LOOP', 'loop_re_not_available')
- run_tests.run_all(self.tiers)
- self.assertTrue(mock_logger_info.called)
+ def test_run_all_missing_tier(self, mock_logger_info):
+ CONST.__setattr__('CI_LOOP', 'loop_re_not_available')
+ run_tests.run_all(self.tiers)
+ self.assertTrue(mock_logger_info.called)
def test_main_failed(self):
kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
@@ -221,7 +204,6 @@ class RunTestsTesting(unittest.TestCase):
with mock.patch('functest.ci.run_tests.tb.TierBuilder',
return_value=mock_obj), \
mock.patch('functest.ci.run_tests.source_rc_file'), \
- mock.patch('functest.ci.run_tests.generate_report.init'), \
mock.patch('functest.ci.run_tests.run_tier') as m:
self.assertEqual(run_tests.main(**kwargs),
run_tests.Result.EX_OK)
@@ -234,7 +216,6 @@ class RunTestsTesting(unittest.TestCase):
with mock.patch('functest.ci.run_tests.tb.TierBuilder',
return_value=mock_obj), \
mock.patch('functest.ci.run_tests.source_rc_file'), \
- mock.patch('functest.ci.run_tests.generate_report.init'), \
mock.patch('functest.ci.run_tests.run_test') as m:
self.assertEqual(run_tests.main(**kwargs),
run_tests.Result.EX_OK)
@@ -248,7 +229,6 @@ class RunTestsTesting(unittest.TestCase):
with mock.patch('functest.ci.run_tests.tb.TierBuilder',
return_value=mock_obj), \
mock.patch('functest.ci.run_tests.source_rc_file'), \
- mock.patch('functest.ci.run_tests.generate_report.init'), \
mock.patch('functest.ci.run_tests.run_all') as m:
self.assertEqual(run_tests.main(**kwargs),
run_tests.Result.EX_OK)
@@ -262,7 +242,6 @@ class RunTestsTesting(unittest.TestCase):
with mock.patch('functest.ci.run_tests.tb.TierBuilder',
return_value=mock_obj), \
mock.patch('functest.ci.run_tests.source_rc_file'), \
- mock.patch('functest.ci.run_tests.generate_report.init'), \
mock.patch('functest.ci.run_tests.logger.debug') as m:
self.assertEqual(run_tests.main(**kwargs),
run_tests.Result.EX_ERROR)
diff --git a/functest/tests/unit/ci/test_tier_builder.py b/functest/tests/unit/ci/test_tier_builder.py
index 438fa7c25..feaf33a81 100644
--- a/functest/tests/unit/ci/test_tier_builder.py
+++ b/functest/tests/unit/ci/test_tier_builder.py
@@ -22,6 +22,7 @@ class TierBuilderTesting(unittest.TestCase):
'scenario': 'test_scenario'}
self.testcase = {'dependencies': self.dependency,
+ 'enabled': 'true',
'case_name': 'test_name',
'criteria': 'test_criteria',
'blocking': 'test_blocking',
@@ -78,6 +79,13 @@ class TierBuilderTesting(unittest.TestCase):
self.assertEqual(self.tierbuilder.get_tests('test_tier2'),
None)
+ def test_get_tier_name_ok(self):
+ self.assertEqual(self.tierbuilder.get_tier_name('test_name'),
+ 'test_tier')
+
+ def test_get_tier_name_ko(self):
+ self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None)
+
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py
index 21df4098b..280062743 100644
--- a/functest/tests/unit/ci/test_tier_handler.py
+++ b/functest/tests/unit/ci/test_tier_handler.py
@@ -32,6 +32,7 @@ class TierHandlerTesting(unittest.TestCase):
'test_ci_loop',
description='test_desc')
self.testcase = tier_handler.TestCase('test_name',
+ 'true',
self.mock_depend,
'test_criteria',
'test_blocking',
@@ -116,6 +117,10 @@ class TierHandlerTesting(unittest.TestCase):
self.assertEqual(self.tier.get_name(),
'test_tier')
+ def test_testcase_is_enabled(self):
+ self.assertEqual(self.testcase.is_enabled(),
+ 'true')
+
def test_testcase_get_criteria(self):
self.assertEqual(self.tier.get_order(),
'test_order')
diff --git a/functest/tests/unit/core/test_pytest_suite_runner.py b/functest/tests/unit/core/test_pytest_suite_runner.py
new file mode 100644
index 000000000..15e5bd73b
--- /dev/null
+++ b/functest/tests/unit/core/test_pytest_suite_runner.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from functest.core import pytest_suite_runner
+from functest.core import testcase
+
+
+class PyTestSuiteRunnerTesting(unittest.TestCase):
+
+ logging.disable(logging.CRITICAL)
+
+ def setUp(self):
+ self.psrunner = pytest_suite_runner.PyTestSuiteRunner()
+ self.result = mock.Mock()
+ attrs = {'errors': [('test1', 'error_msg1')],
+ 'failures': [('test2', 'failure_msg1')]}
+ self.result.configure_mock(**attrs)
+
+ self.pass_results = mock.Mock()
+ attrs = {'errors': None,
+ 'failures': None}
+ self.pass_results.configure_mock(**attrs)
+
+ def test_run(self):
+ self.psrunner.case_name = 'test_case_name'
+ with mock.patch('functest.core.pytest_suite_runner.'
+ 'unittest.TextTestRunner.run',
+ return_value=self.result):
+ self.assertEqual(self.psrunner.run(),
+ testcase.TestCase.EX_OK)
+
+ with mock.patch('functest.core.pytest_suite_runner.'
+ 'unittest.TextTestRunner.run',
+ return_value=self.pass_results):
+ self.assertEqual(self.psrunner.run(),
+ testcase.TestCase.EX_OK)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_testcase.py b/functest/tests/unit/core/test_testcase.py
index 17329ea32..2adf4a6dc 100644
--- a/functest/tests/unit/core/test_testcase.py
+++ b/functest/tests/unit/core/test_testcase.py
@@ -7,7 +7,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-"""Define the classe required to fully cover testcase."""
+"""Define the class required to fully cover testcase."""
import logging
import unittest
@@ -20,7 +20,6 @@ __author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
class TestCaseTesting(unittest.TestCase):
-
"""The class testing TestCase."""
# pylint: disable=missing-docstring,too-many-public-methods
@@ -189,6 +188,41 @@ class TestCaseTesting(unittest.TestCase):
self.test.stop_time = 180
self.assertEqual(self.test.get_duration(), "02:59")
+ def test_str_project_name_ko(self):
+ self.test.project_name = None
+ self.assertIn("<functest.core.testcase.TestCase object at",
+ str(self.test))
+
+ def test_str_case_name_ko(self):
+ self.test.case_name = None
+ self.assertIn("<functest.core.testcase.TestCase object at",
+ str(self.test))
+
+ def test_str_pass(self):
+ duration = '01:01'
+ with mock.patch.object(self.test, 'get_duration',
+ return_value=duration), \
+ mock.patch.object(self.test, 'is_successful',
+ return_value=testcase.TestCase.EX_OK):
+ message = str(self.test)
+ self.assertIn(self._project_name, message)
+ self.assertIn(self._case_name, message)
+ self.assertIn(duration, message)
+ self.assertIn('PASS', message)
+
+ def test_str_fail(self):
+ duration = '00:59'
+ with mock.patch.object(self.test, 'get_duration',
+ return_value=duration), \
+ mock.patch.object(
+ self.test, 'is_successful',
+ return_value=testcase.TestCase.EX_TESTCASE_FAILED):
+ message = str(self.test)
+ self.assertIn(self._project_name, message)
+ self.assertIn(self._case_name, message)
+ self.assertIn(duration, message)
+ self.assertIn('FAIL', message)
+
if __name__ == "__main__":
unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py
new file mode 100644
index 000000000..793e95768
--- /dev/null
+++ b/functest/tests/unit/core/test_vnf.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import os
+import unittest
+
+import mock
+
+from functest.core import vnf
+from functest.core import testcase
+
+
+class VnfBaseTesting(unittest.TestCase):
+
+ logging.disable(logging.CRITICAL)
+
+ def setUp(self):
+ self.test = vnf.VnfOnBoarding(
+ project='functest', case_name='aaa')
+ self.test.project = "functest"
+ self.test.start_time = "1"
+ self.test.stop_time = "5"
+ self.test.result = ""
+ self.test.details = {
+ "orchestrator": {"status": "PASS", "result": "", "duration": 20},
+ "vnf": {"status": "PASS", "result": "", "duration": 15},
+ "test_vnf": {"status": "FAIL", "result": "", "duration": 5}}
+ self.test.keystone_client = 'test_client'
+ self.test.tenant_name = 'test_tenant_name'
+
+ def test_execute_deploy_vnf_fail(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ side_effect=Exception):
+ self.assertEqual(self.test.execute(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_execute_test_vnf_fail(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf'), \
+ mock.patch.object(self.test, 'test_vnf',
+ side_effect=Exception):
+ self.assertEqual(self.test.execute(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ @mock.patch('functest.core.vnf.os_utils.get_tenant_id',
+ return_value='test_tenant_id')
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_user_id')
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_execute_default(self, *args):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=None), \
+ mock.patch.object(self.test, 'deploy_vnf'), \
+ mock.patch.object(self.test, 'test_vnf'), \
+ mock.patch.object(self.test, 'parse_results',
+ return_value='ret_exit_code'), \
+ mock.patch.object(self.test, 'log_results'):
+ self.assertEqual(self.test.execute(),
+ 'ret_exit_code')
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id', return_value='')
+ def test_prepare_missing_userid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='')
+ def test_prepare_missing_tenantid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='')
+ def test_prepare_missing_roleid(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.add_role_user',
+ return_value='')
+ def test_prepare_role_add_failure(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_user_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.create_tenant',
+ return_value='test_tenantid')
+ @mock.patch('functest.core.vnf.os_utils.get_role_id',
+ return_value='test_roleid')
+ @mock.patch('functest.core.vnf.os_utils.add_role_user')
+ @mock.patch('functest.core.vnf.os_utils.create_user',
+ return_value='')
+ def test_create_user_failure(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+
+ def test_log_results_default(self):
+ with mock.patch('functest.core.vnf.'
+ 'ft_utils.logger_test_results') \
+ as mock_method:
+ self.test.log_results()
+ self.assertTrue(mock_method.called)
+
+ def test_step_failures_default(self):
+ with self.assertRaises(Exception):
+ self.test.step_failure("error_msg")
+
+ def test_deploy_vnf_unimplemented(self):
+ with self.assertRaises(Exception) as context:
+ self.test.deploy_vnf()
+ self.assertTrue('VNF not deployed' in context.exception)
+
+ def test_test_vnf_unimplemented(self):
+ with self.assertRaises(Exception) as context:
+ self.test.test_vnf()()
+ self.assertTrue('VNF not tested' in context.exception)
+
+ def test_parse_results_ex_ok(self):
+ self.test.details['test_vnf']['status'] = 'PASS'
+ self.assertEqual(self.test.parse_results(), os.EX_OK)
+
+ def test_parse_results_ex_run_error(self):
+ self.test.details['vnf']['status'] = 'FAIL'
+ self.assertEqual(self.test.parse_results(), os.EX_SOFTWARE)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_vnf_base.py b/functest/tests/unit/core/test_vnf_base.py
deleted file mode 100644
index 540cf610d..000000000
--- a/functest/tests/unit/core/test_vnf_base.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-import logging
-import unittest
-
-from functest.core import vnf_base
-
-
-class VnfBaseTesting(unittest.TestCase):
-
- logging.disable(logging.CRITICAL)
-
- def setUp(self):
- self.test = vnf_base.VnfOnBoardingBase(project='functest',
- case_name='aaa')
- self.test.project = "functest"
- self.test.start_time = "1"
- self.test.stop_time = "5"
- self.test.result = ""
- self.test.details = {"orchestrator": {"status": "PASS",
- "result": "",
- "duration": 20},
- "vnf": {"status": "PASS",
- "result": "",
- "duration": 15},
- "test_vnf": {"status": "FAIL",
- "result": "",
- "duration": 5}}
-
- def test_deploy_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.deploy_vnf()
- self.assertTrue('VNF not deployed' in context.exception)
-
- def test_test_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.test_vnf()()
- self.assertTrue('VNF not tested' in context.exception)
-
- def test_parse_results(self):
- self.assertNotEqual(self.test.parse_results(), 0)
-
-
-if __name__ == "__main__":
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/energy/__init__.py b/functest/tests/unit/energy/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/functest/tests/unit/energy/__init__.py
diff --git a/functest/tests/unit/energy/test_functest_energy.py b/functest/tests/unit/energy/test_functest_energy.py
new file mode 100644
index 000000000..ffe044bc2
--- /dev/null
+++ b/functest/tests/unit/energy/test_functest_energy.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Unitary test for energy module."""
+# pylint: disable=unused-argument
+import logging
+import unittest
+
+import mock
+
+from functest.energy.energy import EnergyRecorder
+import functest.energy.energy as energy
+
+
+CASE_NAME = "UNIT_test_CASE"
+STEP_NAME = "UNIT_test_STEP"
+
+logging.disable(logging.CRITICAL)
+
+
+class MockHttpResponse(object): # pylint: disable=too-few-public-methods
+ """Mock response for Energy recorder API."""
+
+ def __init__(self, text, status_code):
+ """Create an instance of MockHttpResponse."""
+ self.text = text
+ self.status_code = status_code
+
+
+RECORDER_OK = MockHttpResponse(
+ '{"environment": "UNIT_TEST",'
+ ' "step": "string",'
+ ' "scenario": "' + CASE_NAME + '"}',
+ 200
+)
+RECORDER_KO = MockHttpResponse(
+ '{"message": "An unhandled API exception occurred (MOCK)"}',
+ 500
+)
+
+
+def config_loader_mock(config_key):
+ """Return mocked config values."""
+ if config_key == "energy_recorder.api_url":
+ return "http://pod-uri:8888"
+ elif config_key == "energy_recorder.api_user":
+ return "user"
+ elif config_key == "energy_recorder.api_password":
+ return "password"
+ else:
+ raise Exception("Config not mocked")
+
+
+def config_loader_mock_no_creds(config_key):
+ """Return mocked config values."""
+ if config_key == "energy_recorder.api_url":
+ return "http://pod-uri:8888"
+ elif config_key == "energy_recorder.api_user":
+ return ""
+ elif config_key == "energy_recorder.api_password":
+ return ""
+ else:
+ raise Exception("Config not mocked:" + config_key)
+
+
+class EnergyRecorderTest(unittest.TestCase):
+ """Energy module unitary test suite."""
+
+ case_name = CASE_NAME
+ request_headers = {'content-type': 'application/json'}
+ returned_value_to_preserve = "value"
+ exception_message_to_preserve = "exception_message"
+
+ @mock.patch('functest.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_start(self, post_mock=None):
+ """EnergyRecorder.start method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_start_error(self, post_mock=None):
+ """EnergyRecorder.start method (error in method)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_start_api_error(self, post_mock=None):
+ """EnergyRecorder.start method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_set_step(self, post_mock=None):
+ """EnergyRecorder.set_step method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_set_step_api_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_set_step_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.delete',
+ return_value=RECORDER_OK)
+ def test_stop(self, delete_mock=None):
+ """EnergyRecorder.stop method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.delete',
+ return_value=RECORDER_KO)
+ def test_stop_api_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (API Error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers
+ )
+
+ @mock.patch('functest.energy.energy.requests.delete',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_stop_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers
+ )
+
+ @energy.enable_recording
+ def __decorated_method(self):
+ """Call with to energy recorder decorators."""
+ return self.returned_value_to_preserve
+
+ @energy.enable_recording
+ def __decorated_method_with_ex(self):
+ """Call with to energy recorder decorators."""
+ raise Exception(self.exception_message_to_preserve)
+
+ @mock.patch("functest.energy.energy.EnergyRecorder")
+ @mock.patch("functest.utils.functest_utils.get_pod_name",
+ return_value="MOCK_POD")
+ @mock.patch("functest.utils.functest_utils.get_functest_config",
+ side_effect=config_loader_mock)
+ def test_decorators(self,
+ loader_mock=None,
+ pod_mock=None,
+ recorder_mock=None):
+ """Test energy module decorators."""
+ self.__decorated_method()
+ calls = [mock.call.start(self.case_name),
+ mock.call.stop()]
+ recorder_mock.assert_has_calls(calls)
+
+ def test_decorator_preserve_return(self):
+ """Test that decorator preserve method returned value."""
+ self.test_load_config()
+ self.assertTrue(
+ self.__decorated_method() == self.returned_value_to_preserve
+ )
+
+ def test_decorator_preserve_ex(self):
+ """Test that decorator preserve method exceptions."""
+ self.test_load_config()
+ with self.assertRaises(Exception) as context:
+ self.__decorated_method_with_ex()
+ self.assertTrue(
+ self.exception_message_to_preserve in context.exception
+ )
+
+ @mock.patch("functest.utils.functest_utils.get_functest_config",
+ side_effect=config_loader_mock)
+ @mock.patch("functest.utils.functest_utils.get_pod_name",
+ return_value="MOCK_POD")
+ def test_load_config(self, loader_mock=None, pod_mock=None):
+ """Test load config."""
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["auth"],
+ ("user", "password")
+ )
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("functest.utils.functest_utils.get_functest_config",
+ side_effect=config_loader_mock_no_creds)
+ @mock.patch("functest.utils.functest_utils.get_pod_name",
+ return_value="MOCK_POD")
+ def test_load_config_no_creds(self, loader_mock=None, pod_mock=None):
+ """Test load config without creds."""
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None)
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("functest.utils.functest_utils.get_functest_config",
+ return_value=None)
+ @mock.patch("functest.utils.functest_utils.get_pod_name",
+ return_value="MOCK_POD")
+ def test_load_config_ex(self, loader_mock=None, pod_mock=None):
+ """Test load config with exception."""
+ with self.assertRaises(AssertionError):
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api, None)
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/odl/test_odl.py b/functest/tests/unit/odl/test_odl.py
index f3d37c650..e2778e249 100644
--- a/functest/tests/unit/odl/test_odl.py
+++ b/functest/tests/unit/odl/test_odl.py
@@ -12,14 +12,14 @@
import errno
import logging
import os
-import StringIO
import unittest
from keystoneauth1.exceptions import auth_plugins
import mock
from robot.errors import DataError, RobotError
-from robot.result import testcase as result_testcase
+from robot.result import model
from robot.utils.robottime import timestamp_to_secs
+import six
from functest.core import testcase
from functest.opnfv_tests.sdn.odl import odl
@@ -49,11 +49,9 @@ class ODLVisitorTesting(unittest.TestCase):
'elapsedtime': 1000,
'text': 'Hello, World!',
'critical': True}
- test = result_testcase.TestCase(name=data['name'],
- status=data['status'],
- message=data['text'],
- starttime=data['starttime'],
- endtime=data['endtime'])
+ test = model.TestCase(
+ name=data['name'], status=data['status'], message=data['text'],
+ starttime=data['starttime'], endtime=data['endtime'])
test.parent = mock.Mock()
config = {'name': data['parent'],
'criticality.test_is_critical.return_value': data[
@@ -109,6 +107,9 @@ class ODLParseResultTesting(ODLTesting):
"""The class testing ODLTests.parse_results()."""
# pylint: disable=missing-docstring
+ _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
+ 'endtime': '20161216 16:00:01.000'}
+
@mock.patch('robot.api.ExecutionResult', side_effect=DataError)
def test_raises_exc(self, mock_method):
with self.assertRaises(DataError):
@@ -116,15 +117,13 @@ class ODLParseResultTesting(ODLTesting):
mock_method.assert_called_once_with(
os.path.join(odl.ODLTests.res_dir, 'output.xml'))
- def test_ok(self):
- config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
- 'endtime': '20161216 16:00:01.000', 'status': 'PASS'}
+ def _test_result(self, config, result):
suite = mock.Mock()
suite.configure_mock(**config)
with mock.patch('robot.api.ExecutionResult',
return_value=mock.Mock(suite=suite)):
self.test.parse_results()
- self.assertEqual(self.test.result, config['status'])
+ self.assertEqual(self.test.result, result)
self.assertEqual(self.test.start_time,
timestamp_to_secs(config['starttime']))
self.assertEqual(self.test.stop_time,
@@ -132,6 +131,26 @@ class ODLParseResultTesting(ODLTesting):
self.assertEqual(self.test.details,
{'description': config['name'], 'tests': []})
+ def test_null_passed(self):
+ self._config.update({'statistics.critical.passed': 0,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 0)
+
+ def test_no_test(self):
+ self._config.update({'statistics.critical.passed': 20,
+ 'statistics.critical.total': 0})
+ self._test_result(self._config, 0)
+
+ def test_half_success(self):
+ self._config.update({'statistics.critical.passed': 10,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 50)
+
+ def test_success(self):
+ self._config.update({'statistics.critical.passed': 20,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 100)
+
class ODLRobotTesting(ODLTesting):
@@ -152,7 +171,7 @@ class ODLRobotTesting(ODLTesting):
os.path.join(odl.ODLTests.odl_test_repo,
'csit/variables/Variables.py'), inplace=True)
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def _test_set_vars(self, msg1, msg2, *args):
line = mock.MagicMock()
line.__iter__.return_value = [msg1]
@@ -170,7 +189,7 @@ class ODLRobotTesting(ODLTesting):
def test_set_vars_auth1(self):
self._test_set_vars("AUTH1 = []", "AUTH1 = []")
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def test_set_vars_auth_foo(self, *args):
line = mock.MagicMock()
line.__iter__.return_value = ["AUTH = []"]
@@ -558,7 +577,7 @@ class ODLArgParserTesting(ODLTesting):
"--odlip={}".format(self._sdn_controller_ip)]),
self.defaultargs)
- @mock.patch('sys.stderr', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stderr', new_callable=six.StringIO)
def test_fail(self, mock_method):
self.defaultargs['foo'] = 'bar'
with self.assertRaises(SystemExit):
diff --git a/functest/tests/unit/utils/test_decorators.py b/functest/tests/unit/utils/test_decorators.py
new file mode 100644
index 000000000..f8bd9a54f
--- /dev/null
+++ b/functest/tests/unit/utils/test_decorators.py
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the class required to fully cover decorators."""
+
+from datetime import datetime
+import errno
+import json
+import logging
+import os
+import unittest
+
+import mock
+
+from functest.utils import decorators
+from functest.utils import functest_utils
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+VERSION = 'master'
+DIR = '/dev'
+FILE = '{}/null'.format(DIR)
+URL = 'file://{}'.format(FILE)
+
+
+class DecoratorsTesting(unittest.TestCase):
+ # pylint: disable=missing-docstring
+
+ logging.disable(logging.CRITICAL)
+
+ _case_name = 'base'
+ _project_name = 'functest'
+ _start_time = 1.0
+ _stop_time = 2.0
+ _result = 'PASS'
+ _build_tag = VERSION
+ _node_name = 'bar'
+ _deploy_scenario = 'foo'
+ _installer_type = 'debian'
+
+ def setUp(self):
+ os.environ['INSTALLER_TYPE'] = self._installer_type
+ os.environ['DEPLOY_SCENARIO'] = self._deploy_scenario
+ os.environ['NODE_NAME'] = self._node_name
+ os.environ['BUILD_TAG'] = self._build_tag
+
+ def test_wraps(self):
+ self.assertEqual(functest_utils.push_results_to_db.__name__,
+ "push_results_to_db")
+
+ def _get_json(self):
+ stop_time = datetime.fromtimestamp(self._stop_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ start_time = datetime.fromtimestamp(self._start_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ data = {'project_name': self._project_name,
+ 'stop_date': stop_time, 'start_date': start_time,
+ 'case_name': self._case_name, 'build_tag': self._build_tag,
+ 'pod_name': self._node_name, 'installer': self._installer_type,
+ 'scenario': self._deploy_scenario, 'version': VERSION,
+ 'details': {}, 'criteria': self._result}
+ return json.dumps(data)
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value='http://127.0.0.1')
+ @mock.patch('{}.get_version'.format(functest_utils.__name__),
+ return_value=VERSION)
+ @mock.patch('requests.post')
+ def test_http_shema(self, *args):
+ self.assertTrue(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ args[1].assert_called_once_with()
+ args[2].assert_called_once_with()
+ args[0].assert_called_once_with(
+ 'http://127.0.0.1', data=self._get_json(),
+ headers={'Content-Type': 'application/json'})
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value="/dev/null")
+ def test_wrong_shema(self, mock_method=None):
+ self.assertFalse(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ mock_method.assert_called_once_with()
+
+ @mock.patch('{}.get_version'.format(functest_utils.__name__),
+ return_value=VERSION)
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value=URL)
+ def _test_dump(self, *args):
+ with mock.patch.object(decorators, 'open', mock.mock_open(),
+ create=True) as mock_open:
+ self.assertTrue(functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ mock_open.assert_called_once_with(FILE, 'a')
+ handle = mock_open()
+ call_args, _ = handle.write.call_args
+ self.assertIn('POST', call_args[0])
+ self.assertIn(self._get_json(), call_args[0])
+ args[0].assert_called_once_with()
+ args[1].assert_called_once_with()
+
+ @mock.patch('os.makedirs')
+ def test_default_dump(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
+ def test_makedirs_dir_exists(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('{}.get_db_url'.format(functest_utils.__name__),
+ return_value=URL)
+ @mock.patch('os.makedirs', side_effect=OSError)
+ def test_makedirs_exc(self, *args):
+ self.assertFalse(
+ functest_utils.push_results_to_db(
+ self._project_name, self._case_name, self._start_time,
+ self._stop_time, self._result, {}))
+ args[0].assert_called_once_with(DIR)
+ args[1].assert_called_once_with()
+
+
+if __name__ == "__main__":
+ logging.basicConfig()
+ unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_functest_utils.py b/functest/tests/unit/utils/test_functest_utils.py
index 573fcb70f..d48e06f92 100644
--- a/functest/tests/unit/utils/test_functest_utils.py
+++ b/functest/tests/unit/utils/test_functest_utils.py
@@ -11,11 +11,11 @@ import logging
import os
import time
import unittest
-import urllib2
from git.exc import NoSuchPathError
import mock
import requests
+from six.moves import urllib
from functest.tests.unit import test_utils
from functest.utils import functest_utils
@@ -62,31 +62,31 @@ class FunctestUtilsTesting(unittest.TestCase):
self.file_yaml = {'general': {'openstack': {'image_name':
'test_image_name'}}}
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_check_internet_connectivity_failed(self, mock_method):
self.assertFalse(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_default(self, mock_method):
self.assertTrue(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_debian(self, mock_method):
self.url = "https://www.debian.org/"
self.assertTrue(functest_utils.check_internet_connectivity(self.url))
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_download_url_failed(self, mock_url):
self.assertFalse(functest_utils.download_url(self.url, self.dest_path))
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_download_url_default(self, mock_url):
- with mock.patch("__builtin__.open", mock.mock_open()) as m, \
+ with mock.patch("six.moves.builtins.open", mock.mock_open()) as m, \
mock.patch('functest.utils.functest_utils.shutil.copyfileobj')\
as mock_sh:
name = self.url.rsplit('/')[-1]
@@ -371,7 +371,7 @@ class FunctestUtilsTesting(unittest.TestCase):
attrs = {'readline.side_effect': self.readline_side}
m.configure_mock(**attrs)
- with mock.patch("__builtin__.open") as mo:
+ with mock.patch("six.moves.builtins.open") as mo:
mo.return_value = m
self.assertEqual(functest_utils.get_resolvconf_ns(),
self.test_ip[1:])
@@ -399,7 +399,8 @@ class FunctestUtilsTesting(unittest.TestCase):
mock_logger_error):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
@@ -428,7 +429,8 @@ class FunctestUtilsTesting(unittest.TestCase):
):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
@@ -503,7 +505,7 @@ class FunctestUtilsTesting(unittest.TestCase):
@mock.patch('functest.utils.functest_utils.logger.error')
def test_get_dict_by_test(self, mock_logger_error):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
mock.patch('functest.utils.functest_utils.get_testcases_'
@@ -531,7 +533,7 @@ class FunctestUtilsTesting(unittest.TestCase):
def test_get_parameter_from_yaml_failed(self):
self.file_yaml['general'] = None
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
self.assertRaises(ValueError) as excep:
@@ -543,7 +545,7 @@ class FunctestUtilsTesting(unittest.TestCase):
self.parameter) in excep.exception)
def test_get_parameter_from_yaml_default(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
@@ -567,7 +569,7 @@ class FunctestUtilsTesting(unittest.TestCase):
mock_criteria.return_value = self.criteria
resp = functest_utils.check_success_rate(self.case_name,
self.result)
- self.assertEqual(resp, 'PASS')
+ self.assertEqual(resp, 100)
def test_check_success_rate_failed(self):
with mock.patch('functest.utils.functest_utils.get_criteria_by_test') \
@@ -575,7 +577,7 @@ class FunctestUtilsTesting(unittest.TestCase):
mock_criteria.return_value = self.criteria
resp = functest_utils.check_success_rate(self.case_name,
0)
- self.assertEqual(resp, 'FAIL')
+ self.assertEqual(resp, 0)
# TODO: merge_dicts
@@ -586,7 +588,7 @@ class FunctestUtilsTesting(unittest.TestCase):
"functest/ci/testcases.yaml")
def test_get_functest_yaml(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
diff --git a/functest/utils/config.py b/functest/utils/config.py
index b5b845010..d91f63ac2 100755..100644
--- a/functest/utils/config.py
+++ b/functest/utils/config.py
@@ -1,8 +1,11 @@
-import os
+#!/usr/bin/env python
+import os
import yaml
-import env
+import six
+
+from functest.utils import env
class Config(object):
@@ -16,7 +19,7 @@ class Config(object):
self._set_others()
def _parse(self, attr_now, left_parametes):
- for param_n, param_v in left_parametes.iteritems():
+ for param_n, param_v in six.iteritems(left_parametes):
attr_further = self._get_attr_further(attr_now, param_n)
if attr_further:
self.__setattr__(attr_further, param_v)
@@ -32,8 +35,3 @@ class Config(object):
CONF = Config()
-
-if __name__ == "__main__":
- print CONF.vnf_cloudify_ims
- print CONF.vnf_cloudify_ims_tenant_images
- print CONF.vnf_cloudify_ims_tenant_images_centos_7
diff --git a/functest/utils/constants.py b/functest/utils/constants.py
index 2e8eb3f44..75c97c765 100755..100644
--- a/functest/utils/constants.py
+++ b/functest/utils/constants.py
@@ -1,20 +1,17 @@
-import config
-import env
+#!/usr/bin/env python
+
+import six
+
+from functest.utils import config
+from functest.utils import env
class Constants(object):
def __init__(self):
- for attr_n, attr_v in config.CONF.__dict__.iteritems():
+ for attr_n, attr_v in six.iteritems(config.CONF.__dict__):
self.__setattr__(attr_n, attr_v)
- for env_n, env_v in env.ENV.__dict__.iteritems():
+ for env_n, env_v in six.iteritems(env.ENV.__dict__):
self.__setattr__(env_n, env_v)
CONST = Constants()
-
-if __name__ == '__main__':
- print CONST.__dict__
- print CONST.NODE_NAME
- print CONST.vIMS_clearwater_blueprint_url
- print CONST.vIMS_clearwater_blueprint_file_name
- print CONST.vIMS_clearwater_blueprint_name
diff --git a/functest/utils/decorators.py b/functest/utils/decorators.py
index 46ffe35d1..73e0a3520 100644
--- a/functest/utils/decorators.py
+++ b/functest/utils/decorators.py
@@ -3,18 +3,19 @@
# pylint: disable=missing-docstring
import errno
+import functools
import os
-import urlparse
import mock
import requests.sessions
+from six.moves import urllib
def can_dump_request_to_file(method):
def dump_preparedrequest(request, **kwargs):
# pylint: disable=unused-argument
- parseresult = urlparse.urlparse(request.url)
+ parseresult = urllib.parse.urlparse(request.url)
if parseresult.scheme == "file":
try:
dirname = os.path.dirname(parseresult.path)
@@ -33,7 +34,7 @@ def can_dump_request_to_file(method):
def patch_request(method, url, **kwargs):
with requests.sessions.Session() as session:
- parseresult = urlparse.urlparse(url)
+ parseresult = urllib.parse.urlparse(url)
if parseresult.scheme == "file":
with mock.patch.object(session, 'send',
side_effect=dump_preparedrequest):
@@ -41,6 +42,7 @@ def can_dump_request_to_file(method):
else:
return session.request(method=method, url=url, **kwargs)
+ @functools.wraps(method)
def hook(*args, **kwargs):
with mock.patch('requests.api.request', side_effect=patch_request):
return method(*args, **kwargs)
diff --git a/functest/utils/env.py b/functest/utils/env.py
index 7e4df2ea5..c9629e153 100644
--- a/functest/utils/env.py
+++ b/functest/utils/env.py
@@ -1,6 +1,11 @@
+#!/usr/bin/env python
+
import os
import re
+import six
+
+
default_envs = {
'NODE_NAME': 'unknown_pod',
'CI_DEBUG': 'false',
@@ -17,9 +22,9 @@ default_envs = {
class Environment(object):
def __init__(self):
- for k, v in os.environ.iteritems():
+ for k, v in six.iteritems(os.environ):
self.__setattr__(k, v)
- for k, v in default_envs.iteritems():
+ for k, v in six.iteritems(default_envs):
if k not in os.environ:
self.__setattr__(k, v)
self._set_ci_run()
diff --git a/functest/utils/functest_utils.py b/functest/utils/functest_utils.py
index 3e85b9e46..744258b36 100644
--- a/functest/utils/functest_utils.py
+++ b/functest/utils/functest_utils.py
@@ -16,11 +16,11 @@ import shutil
import subprocess
import sys
import time
-import urllib2
from datetime import datetime as dt
import dns.resolver
import requests
+from six.moves import urllib
import yaml
from git import Repo
@@ -39,9 +39,9 @@ def check_internet_connectivity(url='http://www.opnfv.org/'):
Check if there is access to the internet
"""
try:
- urllib2.urlopen(url, timeout=5)
+ urllib.request.urlopen(url, timeout=5)
return True
- except urllib2.URLError:
+ except urllib.error.URLError:
return False
@@ -52,8 +52,8 @@ def download_url(url, dest_path):
name = url.rsplit('/')[-1]
dest = dest_path + "/" + name
try:
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
+ response = urllib.request.urlopen(url)
+ except (urllib.error.HTTPError, urllib.error.URLError):
return False
with open(dest, 'wb') as f:
@@ -318,7 +318,7 @@ def execute_command(cmd, info=False, error_msg="",
f.write(line)
else:
line = line.replace('\n', '')
- print line
+ print(line)
sys.stdout.flush()
if output_file:
f.close()
@@ -385,8 +385,8 @@ def check_success_rate(case_name, result):
logger.warning('check_success_rate will be removed soon')
criteria = get_criteria_by_test(case_name)
if type(criteria) == int and result >= criteria:
- return 'PASS'
- return 'FAIL'
+ return 100
+ return 0
def merge_dicts(dict1, dict2):
diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py
index 7e00a269c..8bd950528 100644
--- a/functest/utils/openstack_utils.py
+++ b/functest/utils/openstack_utils.py
@@ -286,7 +286,7 @@ def get_instances(nova_client):
try:
instances = nova_client.servers.list(search_opts={'all_tenants': 1})
return instances
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instances(nova_client)]: %s" % e)
return None
@@ -295,7 +295,7 @@ def get_instance_status(nova_client, instance):
try:
instance = nova_client.servers.get(instance.id)
return instance.status
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instance_status(nova_client)]: %s" % e)
return None
@@ -304,7 +304,7 @@ def get_instance_by_name(nova_client, instance_name):
try:
instance = nova_client.servers.find(name=instance_name)
return instance
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s"
% (instance_name, e))
return None
@@ -334,7 +334,7 @@ def get_aggregates(nova_client):
try:
aggregates = nova_client.aggregates.list()
return aggregates
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_aggregates(nova_client)]: %s" % e)
return None
@@ -344,7 +344,7 @@ def get_aggregate_id(nova_client, aggregate_name):
aggregates = get_aggregates(nova_client)
_id = [ag.id for ag in aggregates if ag.name == aggregate_name][0]
return _id
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_aggregate_id(nova_client, %s)]:"
" %s" % (aggregate_name, e))
return None
@@ -354,7 +354,7 @@ def get_availability_zones(nova_client):
try:
availability_zones = nova_client.availability_zones.list()
return availability_zones
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_availability_zones(nova_client)]: %s" % e)
return None
@@ -363,7 +363,7 @@ def get_availability_zone_names(nova_client):
try:
az_names = [az.zoneName for az in get_availability_zones(nova_client)]
return az_names
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_availability_zone_names(nova_client)]:"
" %s" % e)
return None
@@ -381,7 +381,7 @@ def create_flavor(nova_client, flavor_name, ram, disk, vcpus, public=True):
# flavor extra specs are not configured, therefore skip the update
pass
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
"'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
return None
@@ -414,7 +414,7 @@ def get_floating_ips(nova_client):
try:
floating_ips = nova_client.floating_ips.list()
return floating_ips
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_floating_ips(nova_client)]: %s" % e)
return None
@@ -427,7 +427,7 @@ def get_hypervisors(nova_client):
if hypervisor.state == "up":
nodes.append(hypervisor.hypervisor_hostname)
return nodes
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_hypervisors(nova_client)]: %s" % e)
return None
@@ -436,7 +436,7 @@ def create_aggregate(nova_client, aggregate_name, av_zone):
try:
nova_client.aggregates.create(aggregate_name, av_zone)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, av_zone, e))
return None
@@ -447,7 +447,7 @@ def add_host_to_aggregate(nova_client, aggregate_name, compute_host):
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.add_host(aggregate_id, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, compute_host, e))
return None
@@ -459,7 +459,7 @@ def create_aggregate_with_host(
create_aggregate(nova_client, aggregate_name, av_zone)
add_host_to_aggregate(nova_client, aggregate_name, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_aggregate_with_host("
"nova_client, %s, %s, %s)]: %s"
% (aggregate_name, av_zone, compute_host, e))
@@ -552,7 +552,7 @@ def create_floating_ip(neutron_client):
ip_json = neutron_client.create_floatingip({'floatingip': props})
fip_addr = ip_json['floatingip']['floating_ip_address']
fip_id = ip_json['floatingip']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
@@ -562,7 +562,7 @@ def add_floating_ip(nova_client, server_id, floatingip_addr):
try:
nova_client.servers.add_floating_ip(server_id, floatingip_addr)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s"
% (server_id, floatingip_addr, e))
return False
@@ -572,7 +572,7 @@ def delete_instance(nova_client, instance_id):
try:
nova_client.servers.force_delete(instance_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_instance(nova_client, '%s')]: %s"
% (instance_id, e))
return False
@@ -582,7 +582,7 @@ def delete_floating_ip(nova_client, floatingip_id):
try:
nova_client.floating_ips.delete(floatingip_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_floating_ip(nova_client, '%s')]: %s"
% (floatingip_id, e))
return False
@@ -593,7 +593,7 @@ def remove_host_from_aggregate(nova_client, aggregate_name, compute_host):
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.remove_host(aggregate_id, compute_host)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:"
" %s" % (aggregate_name, compute_host, e))
return False
@@ -612,7 +612,7 @@ def delete_aggregate(nova_client, aggregate_name):
remove_hosts_from_aggregate(nova_client, aggregate_name)
nova_client.aggregates.delete(aggregate_name)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_aggregate(nova_client, %s)]: %s"
% (aggregate_name, e))
return False
@@ -715,7 +715,7 @@ def create_neutron_net(neutron_client, name):
network = neutron_client.create_network(body=json_body)
network_dict = network['network']
return network_dict['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
% (name, e))
return None
@@ -727,7 +727,7 @@ def create_neutron_subnet(neutron_client, name, cidr, net_id):
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
@@ -738,7 +738,7 @@ def create_neutron_router(neutron_client, name):
try:
router = neutron_client.create_router(json_body)
return router['router']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
% (name, e))
return None
@@ -754,7 +754,7 @@ def create_neutron_port(neutron_client, name, network_id, ip):
try:
port = neutron_client.create_port(body=json_body)
return port['port']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
@@ -765,7 +765,7 @@ def update_neutron_net(neutron_client, network_id, shared=False):
try:
neutron_client.update_network(network_id, body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
@@ -779,7 +779,7 @@ def update_neutron_port(neutron_client, port_id, device_owner):
port = neutron_client.update_port(port=port_id,
body=json_body)
return port['port']['id']
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
@@ -790,7 +790,7 @@ def add_interface_router(neutron_client, router_id, subnet_id):
try:
neutron_client.add_interface_router(router=router_id, body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
@@ -802,7 +802,7 @@ def add_gateway_router(neutron_client, router_id):
try:
neutron_client.add_gateway_router(router_id, router_dict)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -812,7 +812,7 @@ def delete_neutron_net(neutron_client, network_id):
try:
neutron_client.delete_network(network_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
% (network_id, e))
return False
@@ -822,7 +822,7 @@ def delete_neutron_subnet(neutron_client, subnet_id):
try:
neutron_client.delete_subnet(subnet_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
% (subnet_id, e))
return False
@@ -832,7 +832,7 @@ def delete_neutron_router(neutron_client, router_id):
try:
neutron_client.delete_router(router=router_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -842,7 +842,7 @@ def delete_neutron_port(neutron_client, port_id):
try:
neutron_client.delete_port(port_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
% (port_id, e))
return False
@@ -854,7 +854,7 @@ def remove_interface_router(neutron_client, router_id, subnet_id):
neutron_client.remove_interface_router(router=router_id,
body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
@@ -864,7 +864,7 @@ def remove_gateway_router(neutron_client, router_id):
try:
neutron_client.remove_gateway_router(router_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
@@ -994,7 +994,7 @@ def get_security_groups(neutron_client):
security_groups = neutron_client.list_security_groups()[
'security_groups']
return security_groups
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
return None
@@ -1015,7 +1015,7 @@ def create_security_group(neutron_client, sg_name, sg_description):
try:
secgroup = neutron_client.create_security_group(json_body)
return secgroup['security_group']
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_security_group(neutron_client, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
@@ -1070,7 +1070,7 @@ def get_security_group_rules(neutron_client, sg_id):
security_rules = [rule for rule in security_rules
if rule["security_group_id"] == sg_id]
return security_rules
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
" %s" % e)
return None
@@ -1089,7 +1089,7 @@ def check_security_group_rules(neutron_client, sg_id, direction, protocol,
return True
else:
return False
- except Exception, e:
+ except Exception as e:
logger.error("Error [check_security_group_rules("
" neutron_client, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
@@ -1141,7 +1141,7 @@ def add_secgroup_to_instance(nova_client, instance_id, secgroup_id):
try:
nova_client.servers.add_security_group(instance_id, secgroup_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_secgroup_to_instance(nova_client, '%s', "
"'%s')]: %s" % (instance_id, secgroup_id, e))
return False
@@ -1157,7 +1157,7 @@ def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
neutron_client.update_quota(tenant_id=tenant_id,
body=json_body)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
@@ -1167,7 +1167,7 @@ def delete_security_group(neutron_client, secgroup_id):
try:
neutron_client.delete_security_group(secgroup_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
% (secgroup_id, e))
return False
@@ -1180,7 +1180,7 @@ def get_images(nova_client):
try:
images = nova_client.images.list()
return images
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_images]: %s" % e)
return None
@@ -1216,7 +1216,7 @@ def create_glance_image(glance_client, image_name, file_path, disk="qcow2",
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return image_id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
"'%s')]: %s" % (image_name, file_path, public, e))
return None
@@ -1246,7 +1246,7 @@ def delete_glance_image(nova_client, image_id):
try:
nova_client.images.delete(image_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_glance_image(nova_client, '%s')]: %s"
% (image_id, e))
return False
@@ -1259,7 +1259,7 @@ def get_volumes(cinder_client):
try:
volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
return volumes
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_volumes(cinder_client)]: %s" % e)
return None
@@ -1272,7 +1272,7 @@ def list_volume_types(cinder_client, public=True, private=True):
if not private:
volume_types = [vt for vt in volume_types if vt.is_public]
return volume_types
- except Exception, e:
+ except Exception as e:
logger.error("Error [list_volume_types(cinder_client)]: %s" % e)
return None
@@ -1281,7 +1281,7 @@ def create_volume_type(cinder_client, name):
try:
volume_type = cinder_client.volume_types.create(name)
return volume_type
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_volume_type(cinder_client, '%s')]: %s"
% (name, e))
return None
@@ -1296,7 +1296,7 @@ def update_cinder_quota(cinder_client, tenant_id, vols_quota,
try:
cinder_client.quotas.update(tenant_id, **quotas_values)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
"'%s' '%s')]: %s" % (tenant_id, vols_quota,
snapshots_quota, gigabytes_quota, e))
@@ -1314,7 +1314,7 @@ def delete_volume(cinder_client, volume_id, forced=False):
else:
cinder_client.volumes.delete(volume_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
% (volume_id, str(forced), e))
return False
@@ -1324,7 +1324,7 @@ def delete_volume_type(cinder_client, volume_type):
try:
cinder_client.volume_types.delete(volume_type)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_volume_type(cinder_client, '%s')]: %s"
% (volume_type, e))
return False
@@ -1340,7 +1340,7 @@ def get_tenants(keystone_client):
else:
tenants = keystone_client.tenants.list()
return tenants
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_tenants(keystone_client)]: %s" % e)
return None
@@ -1349,7 +1349,7 @@ def get_users(keystone_client):
try:
users = keystone_client.users.list()
return users
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_users(keystone_client)]: %s" % e)
return None
@@ -1397,7 +1397,7 @@ def create_tenant(keystone_client, tenant_name, tenant_description):
tenant_description,
enabled=True)
return tenant.id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
% (tenant_name, tenant_description, e))
return None
@@ -1428,7 +1428,7 @@ def create_user(keystone_client, user_name, user_password,
tenant_id,
enabled=True)
return user.id
- except Exception, e:
+ except Exception as e:
logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
"'%s')]: %s" % (user_name, user_password,
user_email, tenant_id, e))
@@ -1453,7 +1453,7 @@ def add_role_user(keystone_client, user_id, role_id, tenant_id):
else:
keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [add_role_user(keystone_client, '%s', '%s'"
"'%s')]: %s " % (user_id, role_id, tenant_id, e))
return False
@@ -1466,7 +1466,7 @@ def delete_tenant(keystone_client, tenant_id):
else:
keystone_client.tenants.delete(tenant_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_tenant(keystone_client, '%s')]: %s"
% (tenant_id, e))
return False
@@ -1476,7 +1476,7 @@ def delete_user(keystone_client, user_id):
try:
keystone_client.users.delete(user_id)
return True
- except Exception, e:
+ except Exception as e:
logger.error("Error [delete_user(keystone_client, '%s')]: %s"
% (user_id, e))
return False
@@ -1489,6 +1489,6 @@ def get_resource(heat_client, stack_id, resource):
try:
resources = heat_client.resources.get(stack_id, resource)
return resources
- except Exception, e:
+ except Exception as e:
logger.error("Error [get_resource]: %s" % e)
return None
diff --git a/requirements.txt b/requirements.txt
index e709220a5..4170157c1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -17,14 +17,16 @@ python-congressclient==1.5.0
virtualenv==15.1.0
pexpect==4.0
requests>=2.8.0
-robotframework==2.9.1
-robotframework-requests==0.3.8
-robotframework-sshlibrary==2.1.1
+robotframework==3.0.2
+robotframework-httplibrary==0.4.2
+robotframework-requests==0.4.7
+robotframework-sshlibrary==2.1.3
+jmespath==0.9.2
configObj==5.0.6
Flask==0.10.1
xmltodict==0.9.2
scp==0.10.2
-paramiko==1.16.0
+paramiko==2.1.2
subprocess32
shyaml
dnspython
@@ -33,3 +35,5 @@ click==6.6
openbaton-cli==2.2.1-beta7
mock==1.3.0
iniparse==0.4
+PrettyTable>=0.7.1,<0.8 # BSD
+six>=1.9.0 # MIT
diff --git a/test-requirements.txt b/test-requirements.txt
index 471e9c30a..4ba763a5f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -21,8 +21,11 @@ python-openstackclient==2.3.0
python-tackerclient==0.7.0
pyyaml==3.10
requests==2.8.0
-robotframework==2.9.1
-robotframework-requests==0.3.8
-robotframework-sshlibrary==2.1.1
+robotframework==3.0.2
+robotframework-httplibrary==0.4.2
+robotframework-requests==0.4.7
+robotframework-sshlibrary==2.1.3
subprocess32==3.2.7
virtualenv==15.1.0
+PrettyTable>=0.7.1,<0.8 # BSD
+six>=1.9.0 # MIT