aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests')
-rw-r--r--functest/opnfv_tests/openstack/api/connection_check.py2
-rw-r--r--functest/opnfv_tests/openstack/barbican/barbican.py6
-rw-r--r--functest/opnfv_tests/openstack/cinder/cinder_test.py6
-rw-r--r--functest/opnfv_tests/openstack/patrole/patrole.py6
-rw-r--r--functest/opnfv_tests/openstack/rally/rally.py69
-rw-r--r--functest/opnfv_tests/openstack/shaker/shaker.py6
-rw-r--r--functest/opnfv_tests/openstack/tempest/tempest.py138
-rw-r--r--functest/opnfv_tests/openstack/vmtp/vmtp.py6
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_ssh.py6
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_userdata.py6
-rw-r--r--functest/opnfv_tests/sdn/odl/odl.py4
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py12
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py10
-rw-r--r--functest/opnfv_tests/vnf/ims/heat_ims.py10
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py10
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py13
-rw-r--r--functest/opnfv_tests/vnf/router/utilvnf.py25
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py6
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py18
19 files changed, 165 insertions, 194 deletions
diff --git a/functest/opnfv_tests/openstack/api/connection_check.py b/functest/opnfv_tests/openstack/api/connection_check.py
index f3b35e9d9..eaf9767c0 100644
--- a/functest/opnfv_tests/openstack/api/connection_check.py
+++ b/functest/opnfv_tests/openstack/api/connection_check.py
@@ -34,7 +34,7 @@ class ConnectionCheck(testcase.TestCase):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'connection_check'
- super(ConnectionCheck, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.output_log_name = 'functest.log'
self.output_debug_log_name = 'functest.debug.log'
try:
diff --git a/functest/opnfv_tests/openstack/barbican/barbican.py b/functest/opnfv_tests/openstack/barbican/barbican.py
index 7b1bb24f7..0c2429e10 100644
--- a/functest/opnfv_tests/openstack/barbican/barbican.py
+++ b/functest/opnfv_tests/openstack/barbican/barbican.py
@@ -9,8 +9,6 @@
# pylint: disable=missing-docstring
-import logging
-
from six.moves import configparser
from functest.opnfv_tests.openstack.tempest import tempest
@@ -18,10 +16,8 @@ from functest.opnfv_tests.openstack.tempest import tempest
class Barbican(tempest.TempestCommon):
- __logger = logging.getLogger(__name__)
-
def configure(self, **kwargs):
- super(Barbican, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('auth'):
diff --git a/functest/opnfv_tests/openstack/cinder/cinder_test.py b/functest/opnfv_tests/openstack/cinder/cinder_test.py
index d81bb100a..47bf41b8d 100644
--- a/functest/opnfv_tests/openstack/cinder/cinder_test.py
+++ b/functest/opnfv_tests/openstack/cinder/cinder_test.py
@@ -35,7 +35,7 @@ class CinderCheck(singlevm.SingleVm2):
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cinder_test"
- super(CinderCheck, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm2 = None
self.fip2 = None
@@ -52,7 +52,7 @@ class CinderCheck(singlevm.SingleVm2):
return self._write_data() or self._read_data()
def prepare(self):
- super(CinderCheck, self).prepare()
+ super().prepare()
self.vm2 = self.boot_vm(
'{}-vm2_{}'.format(self.case_name, self.guid),
key_name=self.keypair.id,
@@ -124,4 +124,4 @@ class CinderCheck(singlevm.SingleVm2):
self.cloud.delete_floating_ip(self.fip2.id)
if self.volume:
self.cloud.delete_volume(self.volume.id)
- super(CinderCheck, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/openstack/patrole/patrole.py b/functest/opnfv_tests/openstack/patrole/patrole.py
index 8613d5127..bdf18d35e 100644
--- a/functest/opnfv_tests/openstack/patrole/patrole.py
+++ b/functest/opnfv_tests/openstack/patrole/patrole.py
@@ -9,8 +9,6 @@
# pylint: disable=missing-docstring
-import logging
-
from six.moves import configparser
from functest.opnfv_tests.openstack.tempest import tempest
@@ -18,10 +16,8 @@ from functest.opnfv_tests.openstack.tempest import tempest
class Patrole(tempest.TempestCommon):
- __logger = logging.getLogger(__name__)
-
def configure(self, **kwargs):
- super(Patrole, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('rbac'):
diff --git a/functest/opnfv_tests/openstack/rally/rally.py b/functest/opnfv_tests/openstack/rally/rally.py
index 548e774a1..36f377ccf 100644
--- a/functest/opnfv_tests/openstack/rally/rally.py
+++ b/functest/opnfv_tests/openstack/rally/rally.py
@@ -74,7 +74,7 @@ class RallyBase(singlevm.VmReady2):
def __init__(self, **kwargs):
"""Initialize RallyBase object."""
- super(RallyBase, self).__init__(**kwargs)
+ super().__init__(**kwargs)
assert self.orig_cloud
assert self.project
if self.orig_cloud.get_role("admin"):
@@ -174,10 +174,10 @@ class RallyBase(singlevm.VmReady2):
cmd = ("rally deployment list | awk '/" +
getattr(config.CONF, 'rally_deployment_name') +
"/ {print $2}'")
- proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- deployment_uuid = proc.stdout.readline().rstrip()
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as proc:
+ deployment_uuid = proc.stdout.readline().rstrip()
return deployment_uuid.decode("utf-8")
@staticmethod
@@ -362,31 +362,25 @@ class RallyBase(singlevm.VmReady2):
def apply_blacklist(self, case_file_name, result_file_name):
"""Apply blacklist."""
LOGGER.debug("Applying blacklist...")
- cases_file = open(case_file_name, 'r')
- result_file = open(result_file_name, 'w')
-
- black_tests = list(set(self.excl_func() +
- self.excl_scenario()))
-
- if black_tests:
- LOGGER.debug("Blacklisted tests: %s", str(black_tests))
-
- include = True
- for cases_line in cases_file:
- if include:
- for black_tests_line in black_tests:
- if re.search(black_tests_line,
- cases_line.strip().rstrip(':')):
- include = False
- break
+ with open(case_file_name, 'r') as cases_file, open(
+ result_file_name, 'w') as result_file:
+ black_tests = list(set(self.excl_func() + self.excl_scenario()))
+ if black_tests:
+ LOGGER.debug("Blacklisted tests: %s", str(black_tests))
+
+ include = True
+ for cases_line in cases_file:
+ if include:
+ for black_tests_line in black_tests:
+ if re.search(black_tests_line,
+ cases_line.strip().rstrip(':')):
+ include = False
+ break
+ else:
+ result_file.write(str(cases_line))
else:
- result_file.write(str(cases_line))
- else:
- if cases_line.isspace():
- include = True
-
- cases_file.close()
- result_file.close()
+ if cases_line.isspace():
+ include = True
@staticmethod
def file_is_empty(file_name):
@@ -422,7 +416,8 @@ class RallyBase(singlevm.VmReady2):
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- json_results = open(report_json_dir).read()
+ with open(report_json_dir) as json_file:
+ json_results = json_file.read()
self._append_summary(json_results, test_name)
# parse JSON operation result
@@ -648,7 +643,7 @@ class RallyBase(singlevm.VmReady2):
self.clean_rally_logs()
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(RallyBase, self).clean()
+ super().clean()
def is_successful(self):
"""The overall result of the test."""
@@ -656,7 +651,7 @@ class RallyBase(singlevm.VmReady2):
if item['task_status'] is False:
return testcase.TestCase.EX_TESTCASE_FAILED
- return super(RallyBase, self).is_successful()
+ return super().is_successful()
@staticmethod
def update_rally_logs(res_dir, rally_conf='/etc/rally/rally.conf'):
@@ -692,7 +687,7 @@ class RallyBase(singlevm.VmReady2):
"""Run testcase."""
self.start_time = time.time()
try:
- assert super(RallyBase, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.update_rally_logs(self.res_dir)
self.create_rally_deployment(environ=self.project.get_environ())
@@ -720,7 +715,7 @@ class RallySanity(RallyBase):
"""Initialize RallySanity object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_sanity"
- super(RallySanity, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.smoke = True
self.scenario_dir = os.path.join(self.rally_scenario_dir, 'sanity')
@@ -734,7 +729,7 @@ class RallyFull(RallyBase):
"""Initialize RallyFull object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_full"
- super(RallyFull, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.smoke = False
self.scenario_dir = os.path.join(self.rally_scenario_dir, 'full')
@@ -749,13 +744,13 @@ class RallyJobs(RallyBase):
"""Initialize RallyJobs object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "rally_jobs"
- super(RallyJobs, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.task_file = os.path.join(self.rally_dir, 'rally_jobs.yaml')
self.task_yaml = None
def prepare_run(self, **kwargs):
"""Create resources needed by test scenarios."""
- super(RallyJobs, self).prepare_run(**kwargs)
+ super().prepare_run(**kwargs)
with open(os.path.join(self.rally_dir,
'rally_jobs.yaml'), 'r') as task_file:
self.task_yaml = yaml.safe_load(task_file)
diff --git a/functest/opnfv_tests/openstack/shaker/shaker.py b/functest/opnfv_tests/openstack/shaker/shaker.py
index 80d3f4547..7d9922060 100644
--- a/functest/opnfv_tests/openstack/shaker/shaker.py
+++ b/functest/opnfv_tests/openstack/shaker/shaker.py
@@ -47,7 +47,7 @@ class Shaker(singlevm.SingleVm2):
check_console_loop = 12
def __init__(self, **kwargs):
- super(Shaker, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.role = None
def check_requirements(self):
@@ -57,7 +57,7 @@ class Shaker(singlevm.SingleVm2):
self.project.clean()
def prepare(self):
- super(Shaker, self).prepare()
+ super().prepare()
self.cloud.create_security_group_rule(
self.sec.id, port_range_min=self.port, port_range_max=self.port,
protocol='tcp', direction='ingress')
@@ -142,6 +142,6 @@ class Shaker(singlevm.SingleVm2):
return stdout.channel.recv_exit_status()
def clean(self):
- super(Shaker, self).clean()
+ super().clean()
if self.role:
self.orig_cloud.delete_role(self.role.id)
diff --git a/functest/opnfv_tests/openstack/tempest/tempest.py b/functest/opnfv_tests/openstack/tempest/tempest.py
index 60c117dea..60224019c 100644
--- a/functest/opnfv_tests/openstack/tempest/tempest.py
+++ b/functest/opnfv_tests/openstack/tempest/tempest.py
@@ -57,7 +57,7 @@ class TempestCommon(singlevm.VmReady2):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tempest'
- super(TempestCommon, self).__init__(**kwargs)
+ super().__init__(**kwargs)
assert self.orig_cloud
assert self.cloud
assert self.project
@@ -142,22 +142,22 @@ class TempestCommon(singlevm.VmReady2):
}
cmd = ["rally", "verify", "show", "--uuid", verif_id]
LOGGER.info("Showing result for a verification: '%s'.", cmd)
- proc = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- for line in proc.stdout:
- LOGGER.info(line.decode("utf-8").rstrip())
- new_line = line.decode("utf-8").replace(' ', '').split('|')
- if 'Tests' in new_line:
- break
- if 'Testscount' in new_line:
- result['num_tests'] = int(new_line[2])
- elif 'Success' in new_line:
- result['num_success'] = int(new_line[2])
- elif 'Skipped' in new_line:
- result['num_skipped'] = int(new_line[2])
- elif 'Failures' in new_line:
- result['num_failures'] = int(new_line[2])
+ with subprocess.Popen(
+ cmd, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT) as proc:
+ for line in proc.stdout:
+ LOGGER.info(line.decode("utf-8").rstrip())
+ new_line = line.decode("utf-8").replace(' ', '').split('|')
+ if 'Tests' in new_line:
+ break
+ if 'Testscount' in new_line:
+ result['num_tests'] = int(new_line[2])
+ elif 'Success' in new_line:
+ result['num_success'] = int(new_line[2])
+ elif 'Skipped' in new_line:
+ result['num_skipped'] = int(new_line[2])
+ elif 'Failures' in new_line:
+ result['num_failures'] = int(new_line[2])
return result
@staticmethod
@@ -199,10 +199,10 @@ class TempestCommon(singlevm.VmReady2):
cmd = ("rally verify list-verifiers | awk '/" +
getattr(config.CONF, 'tempest_verifier_name') +
"/ {print $2}'")
- proc = subprocess.Popen(cmd, shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
- verifier_uuid = proc.stdout.readline().rstrip()
+ with subprocess.Popen(
+ cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL) as proc:
+ verifier_uuid = proc.stdout.readline().rstrip()
return verifier_uuid.decode("utf-8")
@staticmethod
@@ -342,32 +342,31 @@ class TempestCommon(singlevm.VmReady2):
os.remove(self.raw_list)
os.rename(self.list, self.raw_list)
cases_file = self.read_file(self.raw_list)
- result_file = open(self.list, 'w')
- black_tests = []
- try:
- deploy_scenario = env.get('DEPLOY_SCENARIO')
- if bool(deploy_scenario):
- # if DEPLOY_SCENARIO is set we read the file
- black_list_file = open(black_list)
- black_list_yaml = yaml.safe_load(black_list_file)
- black_list_file.close()
- for item in black_list_yaml:
- scenarios = item['scenarios']
- in_it = rally.RallyBase.in_iterable_re
- if in_it(deploy_scenario, scenarios):
- tests = item['tests']
- black_tests.extend(tests)
- except Exception: # pylint: disable=broad-except
+ with open(self.list, 'w') as result_file:
black_tests = []
- LOGGER.debug("Tempest blacklist file does not exist.")
+ try:
+ deploy_scenario = env.get('DEPLOY_SCENARIO')
+ if bool(deploy_scenario):
+ # if DEPLOY_SCENARIO is set we read the file
+ with open(black_list) as black_list_file:
+ black_list_yaml = yaml.safe_load(black_list_file)
+ black_list_file.close()
+ for item in black_list_yaml:
+ scenarios = item['scenarios']
+ in_it = rally.RallyBase.in_iterable_re
+ if in_it(deploy_scenario, scenarios):
+ tests = item['tests']
+ black_tests.extend(tests)
+ except Exception: # pylint: disable=broad-except
+ black_tests = []
+ LOGGER.debug("Tempest blacklist file does not exist.")
- for cases_line in cases_file:
- for black_tests_line in black_tests:
- if re.search(black_tests_line, cases_line):
- break
- else:
- result_file.write(str(cases_line) + '\n')
- result_file.close()
+ for cases_line in cases_file:
+ for black_tests_line in black_tests:
+ if re.search(black_tests_line, cases_line):
+ break
+ else:
+ result_file.write(str(cases_line) + '\n')
def run_verifier_tests(self, **kwargs):
"""Execute tempest test cases."""
@@ -376,25 +375,24 @@ class TempestCommon(singlevm.VmReady2):
cmd.extend(kwargs.get('option', []))
LOGGER.info("Starting Tempest test suite: '%s'.", cmd)
- f_stdout = open(
- os.path.join(self.res_dir, "tempest.log"), 'w+')
-
- proc = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- bufsize=1)
-
- with proc.stdout:
- for line in iter(proc.stdout.readline, b''):
- if re.search(r"\} tempest\.", line.decode("utf-8")):
- LOGGER.info(line.rstrip())
- elif re.search(r'(?=\(UUID=(.*)\))', line.decode("utf-8")):
- self.verification_id = re.search(
- r'(?=\(UUID=(.*)\))', line.decode("utf-8")).group(1)
- f_stdout.write(line.decode("utf-8"))
- proc.wait()
- f_stdout.close()
+ with open(
+ os.path.join(self.res_dir, "tempest.log"), 'w+') as f_stdout:
+
+ with subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ bufsize=1) as proc:
+
+ with proc.stdout:
+ for line in iter(proc.stdout.readline, b''):
+ if re.search(r"\} tempest\.", line.decode("utf-8")):
+ LOGGER.info(line.rstrip())
+ elif re.search(r'(?=\(UUID=(.*)\))',
+ line.decode("utf-8")):
+ self.verification_id = re.search(
+ r'(?=\(UUID=(.*)\))',
+ line.decode("utf-8")).group(1)
+ f_stdout.write(line.decode("utf-8"))
+ proc.wait()
if self.verification_id is None:
raise Exception('Verification UUID not found')
@@ -643,7 +641,7 @@ class TempestCommon(singlevm.VmReady2):
def run(self, **kwargs):
self.start_time = time.time()
try:
- assert super(TempestCommon, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
if not os.path.exists(self.res_dir):
os.makedirs(self.res_dir)
@@ -683,7 +681,7 @@ class TempestCommon(singlevm.VmReady2):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(TempestCommon, self).clean()
+ super().clean()
def is_successful(self):
"""The overall result of the test."""
@@ -693,7 +691,7 @@ class TempestCommon(singlevm.VmReady2):
if self.tests_count and (
self.details.get("tests_number", 0) != self.tests_count):
return testcase.TestCase.EX_TESTCASE_FAILED
- return super(TempestCommon, self).is_successful()
+ return super().is_successful()
class TempestHeat(TempestCommon):
@@ -706,7 +704,7 @@ class TempestHeat(TempestCommon):
flavor_alt_disk = 4
def __init__(self, **kwargs):
- super(TempestHeat, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.user2 = self.orig_cloud.create_user(
name='{}-user2_{}'.format(self.case_name, self.project.guid),
password=self.project.password,
@@ -723,7 +721,7 @@ class TempestHeat(TempestCommon):
def configure(self, **kwargs):
assert self.user2
- super(TempestHeat, self).configure(**kwargs)
+ super().configure(**kwargs)
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
if not rconfig.has_section('heat_plugin'):
@@ -774,6 +772,6 @@ class TempestHeat(TempestCommon):
"""
Cleanup all OpenStack objects. Should be called on completion.
"""
- super(TempestHeat, self).clean()
+ super().clean()
if self.user2:
self.orig_cloud.delete_user(self.user2.id)
diff --git a/functest/opnfv_tests/openstack/vmtp/vmtp.py b/functest/opnfv_tests/openstack/vmtp/vmtp.py
index 1686489b8..b0e6ff427 100644
--- a/functest/opnfv_tests/openstack/vmtp/vmtp.py
+++ b/functest/opnfv_tests/openstack/vmtp/vmtp.py
@@ -56,7 +56,7 @@ class Vmtp(singlevm.VmReady2):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmtp'
- super(Vmtp, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.config = "{}/vmtp.conf".format(self.res_dir)
(_, self.privkey_filename) = tempfile.mkstemp()
(_, self.pubkey_filename) = tempfile.mkstemp()
@@ -173,7 +173,7 @@ class Vmtp(singlevm.VmReady2):
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(Vmtp, self).run(**kwargs) == self.EX_OK
+ assert super().run(**kwargs) == self.EX_OK
status = testcase.TestCase.EX_RUN_ERROR
if self.orig_cloud.get_role("admin"):
role_name = "admin"
@@ -204,7 +204,7 @@ class Vmtp(singlevm.VmReady2):
def clean(self):
try:
assert self.cloud
- super(Vmtp, self).clean()
+ super().clean()
os.remove(self.privkey_filename)
os.remove(self.pubkey_filename)
self.cloud.delete_network("pns-internal-net_{}".format(self.guid))
diff --git a/functest/opnfv_tests/openstack/vping/vping_ssh.py b/functest/opnfv_tests/openstack/vping/vping_ssh.py
index a7bbfc23c..2eb040b0e 100644
--- a/functest/opnfv_tests/openstack/vping/vping_ssh.py
+++ b/functest/opnfv_tests/openstack/vping/vping_ssh.py
@@ -29,11 +29,11 @@ class VPingSSH(singlevm.SingleVm2):
"""Initialize testcase."""
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_ssh"
- super(VPingSSH, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.vm2 = None
def prepare(self):
- super(VPingSSH, self).prepare()
+ super().prepare()
self.vm2 = self.boot_vm(
'{}-vm2_{}'.format(self.case_name, self.guid),
security_groups=[self.sec.id])
@@ -60,4 +60,4 @@ class VPingSSH(singlevm.SingleVm2):
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- super(VPingSSH, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/openstack/vping/vping_userdata.py b/functest/opnfv_tests/openstack/vping/vping_userdata.py
index 9010895cb..5b2308ec1 100644
--- a/functest/opnfv_tests/openstack/vping/vping_userdata.py
+++ b/functest/opnfv_tests/openstack/vping/vping_userdata.py
@@ -26,7 +26,7 @@ class VPingUserdata(singlevm.VmReady2):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vping_userdata"
- super(VPingUserdata, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
self.vm1 = None
self.vm2 = None
@@ -39,7 +39,7 @@ class VPingUserdata(singlevm.VmReady2):
"""
try:
assert self.cloud
- assert super(VPingUserdata, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
self.vm1 = self.boot_vm()
@@ -134,4 +134,4 @@ class VPingUserdata(singlevm.VmReady2):
self.cloud.delete_server(
self.vm2, wait=True,
timeout=getattr(config.CONF, 'vping_vm_delete_timeout'))
- super(VPingUserdata, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index b54f0f54b..b8c56b1d3 100644
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -49,7 +49,7 @@ class ODLTests(robotframework.RobotFramework):
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
- super(ODLTests, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.res_dir = os.path.join(
getattr(config.CONF, 'dir_results'), 'odl')
self.xml_file = os.path.join(self.res_dir, 'output.xml')
@@ -135,7 +135,7 @@ class ODLTests(robotframework.RobotFramework):
else:
if not self.set_robotframework_vars(odlusername, odlpassword):
return self.EX_RUN_ERROR
- return super(ODLTests, self).run(variable=variable, suites=suites)
+ return super().run(variable=variable, suites=suites)
def run(self, **kwargs):
"""Run suites in OPNFV environment
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index 5049bd0bb..f5557f592 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -83,7 +83,7 @@ class JujuEpc(singlevm.SingleVm2):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
- super(JujuEpc, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
self.case_dir = pkg_resources.resource_filename(
@@ -91,8 +91,8 @@ class JujuEpc(singlevm.SingleVm2):
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
requirements=functest_utils.get_parameter_from_yaml(
@@ -212,7 +212,7 @@ class JujuEpc(singlevm.SingleVm2):
return not stdout.channel.recv_exit_status()
def publish_image_alt(self, name=None):
- image_alt = super(JujuEpc, self).publish_image_alt(name)
+ image_alt = super().publish_image_alt(name)
region_name = self.cloud.region_name if self.cloud.region_name else (
'RegionOne')
(_, stdout, stderr) = self.ssh.exec_command(
@@ -375,7 +375,7 @@ class JujuEpc(singlevm.SingleVm2):
except OSError as ex:
if ex.errno != errno.EEXIST:
self.__logger.exception("Cannot create %s", self.res_dir)
- raise Exception
+ raise Exception from ex
self.__logger.info("ENV:\n%s", env.string())
try:
assert self._install_juju()
@@ -407,7 +407,7 @@ class JujuEpc(singlevm.SingleVm2):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(JujuEpc, self).clean()
+ super().clean()
def sig_test_format(sig_test):
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index d937cc052..4428990a8 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -53,14 +53,14 @@ class CloudifyIms(cloudify.Cloudify):
"""Initialize CloudifyIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
- super(CloudifyIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
@@ -114,7 +114,7 @@ class CloudifyIms(cloudify.Cloudify):
network, security group, fip, VM creation
"""
- assert super(CloudifyIms, self).execute() == 0
+ assert super().execute() == 0
start_time = time.time()
self.orig_cloud.set_network_quotas(
self.project.project.name,
@@ -259,4 +259,4 @@ class CloudifyIms(cloudify.Cloudify):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyIms, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.py b/functest/opnfv_tests/vnf/ims/heat_ims.py
index 4edb2ddd9..60478adcb 100644
--- a/functest/opnfv_tests/vnf/ims/heat_ims.py
+++ b/functest/opnfv_tests/vnf/ims/heat_ims.py
@@ -57,14 +57,14 @@ class HeatIms(singlevm.VmReady2):
"""Initialize HeatIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "heat_ims"
- super(HeatIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
@@ -137,7 +137,7 @@ class HeatIms(singlevm.VmReady2):
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(HeatIms, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
if not self.execute():
@@ -247,6 +247,6 @@ class HeatIms(singlevm.VmReady2):
pass
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean stack ressources")
- super(HeatIms, self).clean()
+ super().clean()
if self.role:
self.orig_cloud.delete_role(self.role.id)
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
index 82f57ca7b..93779f4f8 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
@@ -51,14 +51,14 @@ class CloudifyVrouter(cloudify.Cloudify):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vyos_vrouter"
- super(CloudifyVrouter, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/router')
@@ -127,7 +127,7 @@ class CloudifyVrouter(cloudify.Cloudify):
network, security group, fip, VM creation
"""
# network creation
- super(CloudifyVrouter, self).execute()
+ super().execute()
start_time = time.time()
self.put_private_key()
self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
@@ -231,4 +231,4 @@ class CloudifyVrouter(cloudify.Cloudify):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyVrouter, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
index 0a56913b7..7c532d9a3 100644
--- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
+++ b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
@@ -32,13 +32,12 @@ class FunctionTestExec():
credentials = util_info["credentials"]
self.vnf_ctrl = VnfController(util_info)
- test_cmd_map_file = open(
- os.path.join(
- self.util.vnf_data_dir, self.util.command_template_dir,
- self.util.test_cmd_map_yaml_file),
- 'r')
- self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- test_cmd_map_file.close()
+ with open(
+ os.path.join(
+ self.util.vnf_data_dir, self.util.command_template_dir,
+ self.util.test_cmd_map_yaml_file),
+ 'r') as test_cmd_map_file:
+ self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
self.util.set_credentials(credentials["cloud"])
diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py
index 2db3b38e5..7339573d7 100644
--- a/functest/opnfv_tests/vnf/router/utilvnf.py
+++ b/functest/opnfv_tests/vnf/router/utilvnf.py
@@ -212,24 +212,21 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
def write_result_data(self, result_data):
test_result = []
if not os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "w")
- file_fd.close()
+ with open(self.test_result_json_file, "w") as file_fd:
+ pass
else:
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "r") as file_fd:
+ test_result = json.load(file_fd)
test_result.append(result_data)
- file_fd = open(self.test_result_json_file, "w")
- json.dump(test_result, file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "w") as file_fd:
+ json.dump(test_result, file_fd)
def output_test_result_json(self):
if os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(self.test_result_json_file, "r") as file_fd:
+ test_result = json.load(file_fd)
output_json_data = json.dumps(test_result,
sort_keys=True,
indent=4)
@@ -239,8 +236,6 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
@staticmethod
def get_test_scenario(file_path):
- test_scenario_file = open(file_path,
- 'r')
- test_scenario_yaml = yaml.safe_load(test_scenario_file)
- test_scenario_file.close()
+ with open(file_path, 'r') as test_scenario_file:
+ test_scenario_yaml = yaml.safe_load(test_scenario_file)
return test_scenario_yaml["test_scenario_list"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
index b159ddda4..db276cfdb 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
@@ -101,10 +101,8 @@ class VmController():
def command_create_and_execute(self, ssh, test_cmd_file_path,
cmd_input_param, prompt_file_path):
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
config_mode_prompt = prompt["config_mode"]
commands = self.command_gen_from_template(test_cmd_file_path,
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
index 7ed287c6e..4ab394760 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
@@ -49,10 +49,8 @@ class VnfController():
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
parameter_file_path, prompt_file_path):
# pylint: disable=too-many-arguments
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(parameter_file_path, 'r') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"]
@@ -71,19 +69,15 @@ class VnfController():
res_dict_data_list = []
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(parameter_file_path, 'r') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
cmd_input_param["destination_ip"] = reference_vnf[
"data_plane_network_ip"]
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
terminal_mode_prompt = prompt["terminal_mode"]
ssh = SshClient(target_vnf["floating_ip"],