aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/vnf
diff options
context:
space:
mode:
Diffstat (limited to 'functest/opnfv_tests/vnf')
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py362
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater.py29
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py12
-rw-r--r--functest/opnfv_tests/vnf/ims/heat_ims.py19
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py14
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py15
-rw-r--r--functest/opnfv_tests/vnf/router/utilvnf.py39
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py2
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py8
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py22
10 files changed, 255 insertions, 267 deletions
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index e08e7eb5d..1cf240b80 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -14,11 +14,11 @@ import os
import time
import json
import re
-import subprocess
import sys
from copy import deepcopy
import pkg_resources
+import scp
from functest.core import singlevm
from functest.utils import config
@@ -58,7 +58,7 @@ CREDS_TEMPLATE = """credentials:
username: {user_n}"""
-class JujuEpc(singlevm.VmReady2):
+class JujuEpc(singlevm.SingleVm2):
# pylint:disable=too-many-instance-attributes
"""Abot EPC deployed with JUJU Orchestrator Case"""
@@ -74,26 +74,25 @@ class JujuEpc(singlevm.VmReady2):
flavor_ram = 2048
flavor_vcpus = 1
flavor_disk = 10
-
flavor_alt_ram = 4096
flavor_alt_vcpus = 1
flavor_alt_disk = 10
-
+ username = 'ubuntu'
juju_timeout = '4800'
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "juju_epc"
- super(JujuEpc, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/epc')
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
self.orchestrator = dict(
requirements=functest_utils.get_parameter_from_yaml(
@@ -139,25 +138,27 @@ class JujuEpc(singlevm.VmReady2):
try:
self.public_auth_url = self.get_public_auth_url(self.orig_cloud)
if not self.public_auth_url.endswith(('v3', 'v3/')):
- self.public_auth_url = "{}/v3".format(self.public_auth_url)
+ self.public_auth_url = f"{self.public_auth_url}/v3"
except Exception: # pylint: disable=broad-except
self.public_auth_url = None
self.sec = None
self.image_alt = None
self.flavor_alt = None
- def check_requirements(self):
- if not os.path.exists("/src/epc-requirements/go/bin/juju"):
- self.__logger.warning(
- "Juju cannot be cross-compiled (arm and arm64) from the time "
- "being")
- self.is_skipped = True
- self.project.clean()
- if env.get('NEW_USER_ROLE').lower() == "admin":
- self.__logger.warning(
- "Defining NEW_USER_ROLE=admin will easily break the testcase "
- "because Juju doesn't manage tenancy (e.g. subnet "
- "overlapping)")
+ def _install_juju(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo snap install juju --channel=2.3/stable --classic')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _install_juju_wait(self):
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo apt-get update && sudo apt-get install python3-pip -y && '
+ 'sudo pip3 install juju_wait===2.6.4')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def _register_cloud(self):
assert self.public_auth_url
@@ -167,11 +168,15 @@ class JujuEpc(singlevm.VmReady2):
'url': self.public_auth_url,
'region': self.cloud.region_name if self.cloud.region_name else (
'RegionOne')}
- with open(clouds_yaml, 'w') as yfile:
+ with open(clouds_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CLOUD_TEMPLATE.format(**cloud_data))
- cmd = ['juju', 'add-cloud', 'abot-epc', '-f', clouds_yaml, '--replace']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(clouds_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-cloud abot-epc -f clouds.yaml --replace')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def _register_credentials(self):
self.__logger.info("Creating Credentials for Abot-epc .....")
@@ -184,48 +189,38 @@ class JujuEpc(singlevm.VmReady2):
"project_domain_name", "Default"),
'user_domain_n': self.cloud.auth.get(
"user_domain_name", "Default")}
- with open(credentials_yaml, 'w') as yfile:
+ with open(credentials_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CREDS_TEMPLATE.format(**creds_data))
- cmd = ['juju', 'add-credential', 'abot-epc', '-f', credentials_yaml,
- '--replace', '--debug']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
-
- def prepare(self):
- """Prepare testcase (Additional pre-configuration steps)."""
- assert self.public_auth_url
- self.__logger.info("Additional pre-configuration steps")
- try:
- os.makedirs(self.res_dir)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- self.__logger.exception("Cannot create %s", self.res_dir)
- raise Exception
-
- self.__logger.info("ENV:\n%s", env.string())
- self._register_cloud()
- self._register_credentials()
-
- def publish_image(self, name=None):
- image = super(JujuEpc, self).publish_image(name)
- cmd = ['juju', 'metadata', 'generate-image', '-d', '/root',
- '-i', image.id, '-s', 'xenial',
- '-r', self.cloud.region_name if self.cloud.region_name else (
- 'RegionOne'),
- '-u', self.public_auth_url]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- return image
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(credentials_yaml, remote_path='~/')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju add-credential abot-epc -f credentials.yaml '
+ ' --replace --debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
+
+ def _publish_image(self):
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {self.image.id} -s xenial -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def publish_image_alt(self, name=None):
- image_alt = super(JujuEpc, self).publish_image_alt(name)
- cmd = ['juju', 'metadata', 'generate-image', '-d', '/root',
- '-i', image_alt.id, '-s', 'trusty',
- '-r', self.cloud.region_name if self.cloud.region_name else (
- 'RegionOne'),
- '-u', self.public_auth_url]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
+ image_alt = super().publish_image_alt(name)
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju metadata generate-image -d /home/ubuntu '
+ f'-i {image_alt.id} -s trusty -r {region_name} '
+ f'-u {self.public_auth_url}')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
return image_alt
def deploy_orchestrator(self): # pylint: disable=too-many-locals
@@ -234,47 +229,39 @@ class JujuEpc(singlevm.VmReady2):
Bootstrap juju
"""
+ self._publish_image()
self.image_alt = self.publish_image_alt()
self.flavor_alt = self.create_flavor_alt()
self.__logger.info("Starting Juju Bootstrap process...")
- try:
- cmd = ['timeout', JujuEpc.juju_timeout,
- 'juju', 'bootstrap',
- 'abot-epc/{}'.format(
- self.cloud.region_name if self.cloud.region_name else (
- 'RegionOne')),
- 'abot-controller',
- '--agent-version', '2.3.9',
- '--metadata-source', '/root',
- '--constraints', 'mem=2G',
- '--bootstrap-series', 'xenial',
- '--config', 'network={}'.format(self.network.id),
- '--config', 'ssl-hostname-verification=false',
- '--config', 'external-network={}'.format(self.ext_net.id),
- '--config', 'use-floating-ip=true',
- '--config', 'use-default-secgroup=true',
- '--debug']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Bootstrap: %s\n%s",
- cpe.cmd, cpe.output.decode("utf-8"))
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with Juju Bootstrap ...")
- return False
-
- return True
+ region_name = self.cloud.region_name if self.cloud.region_name else (
+ 'RegionOne')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} '
+ f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller '
+ '--agent-version 2.3.9 --metadata-source /home/ubuntu '
+ '--constraints mem=2G --bootstrap-series xenial '
+ f'--config network={self.network.id} '
+ '--config ssl-hostname-verification=false '
+ f'--config external-network={self.ext_net.id} '
+ '--config use-floating-ip=true '
+ '--config use-default-secgroup=true '
+ '--debug')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def check_app(self, name='abot-epc-basic', status='active'):
"""Check application status."""
- cmd = ['juju', 'status', '--format', 'short', name]
for i in range(10):
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'/snap/bin/juju status --format short {name}')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ continue
ret = re.search(
- r'(?=workload:({})\))'.format(status), output.decode("utf-8"))
+ rf'(?=workload:({status})\))', output)
if ret:
self.__logger.info("%s workload is %s", name, status)
break
@@ -289,68 +276,81 @@ class JujuEpc(singlevm.VmReady2):
def deploy_vnf(self):
"""Deploy ABOT-OAI-EPC."""
self.__logger.info("Upload VNFD")
- descriptor = self.vnf['descriptor']
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ '/src/epc-requirements/abot_charm', remote_path='~/',
+ recursive=True)
self.__logger.info("Deploying Abot-epc bundle file ...")
- cmd = ['juju', 'deploy', '{}'.format(descriptor.get('file_name'))]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- self.__logger.info("Waiting for instances .....")
- try:
- cmd = ['timeout', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- self.__logger.info("Deployed Abot-epc on Openstack")
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju VNF Deployment: %s\n%s",
- cpe.cmd, cpe.output.decode("utf-8"))
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue with the VNF Deployment ..")
- return False
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'sudo mkdir -p /src/epc-requirements && '
+ 'sudo mv abot_charm /src/epc-requirements/abot_charm && '
+ '/snap/bin/juju deploy '
+ '/src/epc-requirements/abot_charm/functest-abot-epc-bundle/'
+ 'bundle.yaml')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
self.__logger.info("Checking status of ABot and EPC units ...")
- cmd = ['juju', 'status']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug("%s\n%s", " ".join(cmd), output.decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command('/snap/bin/juju status')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
for app in ['abot-epc-basic', 'oai-epc', 'oai-hss']:
if not self.check_app(app):
return False
-
- self.__logger.info("Transferring the feature files to Abot_node ...")
- cmd = ['timeout', '60', 'juju', 'scp', '--', '-r', '-v',
- '{}/featureFiles'.format(self.case_dir),
- 'abot-epc-basic/0:/etc/rebaca-test-suite/']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
-
- return True
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(
+ f'{self.case_dir}/featureFiles', remote_path='~/',
+ recursive=True)
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v '
+ '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/')
+ output = stdout.read().decode("utf-8")
+ self.__logger.debug("stdout:\n%s", output)
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ return not stdout.channel.recv_exit_status()
def test_vnf(self):
"""Run test on ABoT."""
start_time = time.time()
- self.__logger.info("Running VNF Test cases....")
- cmd = ['juju', 'run-action', 'abot-epc-basic/0', 'run',
- 'tagnames={}'.format(self.details['test_vnf']['tag_name'])]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
-
- cmd = ['timeout', JujuEpc.juju_timeout, 'juju-wait']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
-
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "/snap/bin/juju run-action abot-epc-basic/0 "
+ f"run tagnames={self.details['test_vnf']['tag_name']}")
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ (_, stdout, stderr) = self.ssh.exec_command(
+ 'PATH=/snap/bin/:$PATH '
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
duration = time.time() - start_time
self.__logger.info("Getting results from Abot node....")
- cmd = ['timeout', JujuEpc.juju_timeout,
- 'juju', 'scp', '--', '-v',
- 'abot-epc-basic/0:'
- '/var/lib/abot-epc-basic/artifacts/TestResults.json',
- '{}/.'.format(self.res_dir)]
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp '
+ '-- -v abot-epc-basic/0:'
+ '/var/lib/abot-epc-basic/artifacts/TestResults.json .')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ if stdout.channel.recv_exit_status():
+ return not stdout.channel.recv_exit_status()
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.get('TestResults.json', self.res_dir)
self.__logger.info("Parsing the Test results...")
- res = (process_abot_test_result('{}/TestResults.json'.format(
- self.res_dir)))
+ res = process_abot_test_result(f'{self.res_dir}/TestResults.json')
short_result = sig_test_format(res)
self.__logger.info(short_result)
self.details['test_vnf'].update(
@@ -362,52 +362,48 @@ class JujuEpc(singlevm.VmReady2):
short_result['failures'], short_result['skipped'])
return True
- def run(self, **kwargs):
- self.start_time = time.time()
+ def execute(self):
+ """Prepare testcase (Additional pre-configuration steps)."""
+ assert self.public_auth_url
+ self.__logger.info("Additional pre-configuration steps")
+ try:
+ os.makedirs(self.res_dir)
+ except OSError as ex:
+ if ex.errno != errno.EEXIST:
+ self.__logger.exception("Cannot create %s", self.res_dir)
+ raise Exception from ex
+ self.__logger.info("ENV:\n%s", env.string())
try:
- assert super(JujuEpc, self).run(**kwargs) == self.EX_OK
- self.prepare()
- if (self.deploy_orchestrator() and
- self.deploy_vnf() and
- self.test_vnf()):
- self.stop_time = time.time()
- self.result = 100
- return self.EX_OK
- self.result = 0
- self.stop_time = time.time()
- return self.EX_TESTCASE_FAILED
+ assert self._install_juju()
+ assert self._install_juju_wait()
+ assert self._register_cloud()
+ assert self._register_credentials()
+ assert self.deploy_orchestrator()
+ assert self.deploy_vnf()
+ assert self.test_vnf()
except Exception: # pylint: disable=broad-except
- self.result = 0
- self.stop_time = time.time()
- self.__logger.exception("Exception on VNF testing")
- return self.EX_TESTCASE_FAILED
+ self.__logger.exception("juju_epc failed")
+ return 1
+ return 0
def clean(self):
"""Clean created objects/functions."""
- try:
- cmd = ['juju', 'debug-log', '--replay', '--no-tail']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.debug(
- "%s\n%s", " ".join(cmd), output.decode("utf-8"))
- self.__logger.info("Destroying Orchestrator...")
- cmd = ['timeout', JujuEpc.juju_timeout,
- 'juju', 'destroy-controller', '-y', 'abot-controller',
- '--destroy-all-models']
- output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
- self.__logger.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- except subprocess.CalledProcessError as cpe:
- self.__logger.error(
- "Exception with Juju Cleanup: %s\n%s",
- cpe.cmd, cpe.output.decode("utf-8"))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("General issue during the undeployment ..")
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju debug-log --replay --no-tail')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ '/snap/bin/juju destroy-controller -y abot-controller '
+ '--destroy-all-models')
+ self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
for fip in self.cloud.list_floating_ips():
self.cloud.delete_floating_ip(fip.id)
if self.image_alt:
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(JujuEpc, self).clean()
+ super().clean()
def sig_test_format(sig_test):
@@ -433,7 +429,7 @@ def sig_test_format(sig_test):
def process_abot_test_result(file_path):
""" Process ABoT Result """
- with open(file_path) as test_result:
+ with open(file_path, encoding='utf-8') as test_result:
data = json.load(test_result)
res = []
for tests in data:
diff --git a/functest/opnfv_tests/vnf/ims/clearwater.py b/functest/opnfv_tests/vnf/ims/clearwater.py
index 67128b11c..4c143fd70 100644
--- a/functest/opnfv_tests/vnf/ims/clearwater.py
+++ b/functest/opnfv_tests/vnf/ims/clearwater.py
@@ -50,7 +50,7 @@ class ClearwaterTesting():
output_dict = {}
self.logger.debug('Ellis IP: %s', self.ellis_ip)
output_dict['ellis_ip'] = self.ellis_ip
- account_url = 'http://{0}/accounts'.format(self.ellis_ip)
+ account_url = f'http://{self.ellis_ip}/accounts'
params = {"password": "functest",
"full_name": "opnfv functest user",
"email": "functest@opnfv.org",
@@ -60,7 +60,7 @@ class ClearwaterTesting():
number_res = self._create_ellis_account(account_url, params)
output_dict['number'] = number_res
- session_url = 'http://{0}/session'.format(self.ellis_ip)
+ session_url = f'http://{self.ellis_ip}/session'
session_data = {
'username': params['email'],
'password': params['password'],
@@ -68,8 +68,8 @@ class ClearwaterTesting():
}
cookies = self._get_ellis_session_cookies(session_url, session_data)
- number_url = 'http://{0}/accounts/{1}/numbers'.format(
- self.ellis_ip, params['email'])
+ number_url = (
+ f"http://{self.ellis_ip}/accounts/{params['email']}/numbers")
self.logger.debug('Create 1st calling number on Ellis')
number_res = self._create_ellis_number(number_url, cookies)
@@ -97,8 +97,7 @@ class ClearwaterTesting():
"try %s: cannot create ellis account", iloop + 1)
time.sleep(30)
raise Exception(
- "Unable to create an account {}".format(
- params.get('full_name')))
+ f"Unable to create an account {params.get('full_name')}")
def _get_ellis_session_cookies(self, session_url, params):
i = 15
@@ -150,24 +149,20 @@ class ClearwaterTesting():
"""
# pylint: disable=too-many-locals,too-many-arguments
self.logger.info('Run Clearwater live test')
- script = ('cd {0};'
- 'rake test[{1}] SIGNUP_CODE={2}'
- .format(self.test_dir,
- public_domain,
- signup_code))
+ script = (f'cd {self.test_dir};'
+ f'rake test[{public_domain}] SIGNUP_CODE={signup_code}')
if self.bono_ip and self.ellis_ip:
- subscript = ' PROXY={0} ELLIS={1}'.format(
- self.bono_ip, self.ellis_ip)
- script = '{0}{1}'.format(script, subscript)
- script = ('{0}{1}'.format(script, ' --trace'))
- cmd = "/bin/bash -c '{0}'".format(script)
+ subscript = f' PROXY={self.bono_ip} ELLIS={self.ellis_ip}'
+ script = f'{script}{subscript}'
+ script = f'{script} --trace'
+ cmd = f"/bin/sh -c '{script}'"
self.logger.debug('Live test cmd: %s', cmd)
output_file = os.path.join(self.result_dir, "ims_test_output.txt")
ft_utils.execute_command(cmd,
error_msg='Clearwater live test failed',
output_file=output_file)
- with open(output_file, 'r') as ofile:
+ with open(output_file, 'r', encoding='utf-8') as ofile:
result = ofile.read()
if result != "":
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index d937cc052..b93af7d6d 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -53,14 +53,14 @@ class CloudifyIms(cloudify.Cloudify):
"""Initialize CloudifyIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
- super(CloudifyIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
@@ -114,7 +114,7 @@ class CloudifyIms(cloudify.Cloudify):
network, security group, fip, VM creation
"""
- assert super(CloudifyIms, self).execute() == 0
+ assert super().execute() == 0
start_time = time.time()
self.orig_cloud.set_network_quotas(
self.project.project.name,
@@ -259,4 +259,4 @@ class CloudifyIms(cloudify.Cloudify):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyIms, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.py b/functest/opnfv_tests/vnf/ims/heat_ims.py
index 4a57a7445..0d4e345a0 100644
--- a/functest/opnfv_tests/vnf/ims/heat_ims.py
+++ b/functest/opnfv_tests/vnf/ims/heat_ims.py
@@ -57,14 +57,14 @@ class HeatIms(singlevm.VmReady2):
"""Initialize HeatIms testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "heat_ims"
- super(HeatIms, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/ims')
@@ -112,9 +112,10 @@ class HeatIms(singlevm.VmReady2):
project=self.project.project.id,
domain=self.project.domain.id)
self.keypair = self.cloud.create_keypair(
- '{}-kp_{}'.format(self.case_name, self.guid))
+ f'{self.case_name}-kp_{self.guid}')
self.__logger.info("keypair:\n%s", self.keypair.private_key)
- with open(self.key_filename, 'w') as private_key_file:
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
private_key_file.write(self.keypair.private_key)
if self.deploy_vnf() and self.test_vnf():
@@ -137,7 +138,7 @@ class HeatIms(singlevm.VmReady2):
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(HeatIms, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
if not self.execute():
@@ -162,7 +163,7 @@ class HeatIms(singlevm.VmReady2):
server.public_v4, username=username,
key_filename=self.key_filename, timeout=timeout)
(_, stdout, _) = ssh.exec_command('sudo monit summary')
- self.__logger.info("output:\n%s", stdout.read())
+ self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
ssh.close()
def deploy_vnf(self):
@@ -247,6 +248,6 @@ class HeatIms(singlevm.VmReady2):
pass
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean stack ressources")
- super(HeatIms, self).clean()
+ super().clean()
if self.role:
self.orig_cloud.delete_role(self.role.id)
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
index c793953d3..32d675347 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
@@ -40,6 +40,8 @@ class CloudifyVrouter(cloudify.Cloudify):
flavor_alt_vcpus = 1
flavor_alt_disk = 3
+ check_console_loop = 12
+
cop_yaml = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
"releases/download/2.14.7/plugin.yaml")
cop_wgn = ("https://github.com/cloudify-cosmo/cloudify-openstack-plugin/"
@@ -49,14 +51,14 @@ class CloudifyVrouter(cloudify.Cloudify):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "vyos_vrouter"
- super(CloudifyVrouter, self).__init__(**kwargs)
+ super().__init__(**kwargs)
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
- except Exception:
- raise Exception("VNF config file not found")
+ config.CONF, f'vnf_{self.case_name}_config')
+ except Exception as exc:
+ raise Exception("VNF config file not found") from exc
self.case_dir = pkg_resources.resource_filename(
'functest', 'opnfv_tests/vnf/router')
@@ -125,7 +127,7 @@ class CloudifyVrouter(cloudify.Cloudify):
network, security group, fip, VM creation
"""
# network creation
- super(CloudifyVrouter, self).execute()
+ super().execute()
start_time = time.time()
self.put_private_key()
self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
@@ -229,4 +231,4 @@ class CloudifyVrouter(cloudify.Cloudify):
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
self.orig_cloud.delete_flavor(self.flavor_alt.id)
- super(CloudifyVrouter, self).clean()
+ super().clean()
diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
index 0a56913b7..9eb3c5d69 100644
--- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
+++ b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
@@ -32,17 +32,16 @@ class FunctionTestExec():
credentials = util_info["credentials"]
self.vnf_ctrl = VnfController(util_info)
- test_cmd_map_file = open(
- os.path.join(
- self.util.vnf_data_dir, self.util.command_template_dir,
- self.util.test_cmd_map_yaml_file),
- 'r')
- self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
- test_cmd_map_file.close()
+ with open(
+ os.path.join(
+ self.util.vnf_data_dir, self.util.command_template_dir,
+ self.util.test_cmd_map_yaml_file),
+ 'r', encoding='utf-8') as test_cmd_map_file:
+ self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py
index 2db3b38e5..111f20c1a 100644
--- a/functest/opnfv_tests/vnf/router/utilvnf.py
+++ b/functest/opnfv_tests/vnf/router/utilvnf.py
@@ -64,7 +64,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
if not os.path.exists(self.vnf_data_dir):
os.makedirs(self.vnf_data_dir)
- with open(self.test_env_config_yaml) as file_fd:
+ with open(self.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -98,9 +98,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
return mac_address
def get_blueprint_outputs(self, cfy_manager_ip, deployment_name):
- url = "http://%s/deployments/%s/outputs" % (
- cfy_manager_ip, deployment_name)
-
+ url = f"http://{cfy_manager_ip}/deployments/{deployment_name}/outputs"
response = requests.get(
url,
auth=requests.auth.HTTPBasicAuth('admin', 'admin'),
@@ -212,24 +210,29 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
def write_result_data(self, result_data):
test_result = []
if not os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "w")
- file_fd.close()
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
+ pass
else:
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
+ test_result = json.load(file_fd)
test_result.append(result_data)
- file_fd = open(self.test_result_json_file, "w")
- json.dump(test_result, file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
+ json.dump(test_result, file_fd)
def output_test_result_json(self):
if os.path.isfile(self.test_result_json_file):
- file_fd = open(self.test_result_json_file, "r")
- test_result = json.load(file_fd)
- file_fd.close()
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
+ test_result = json.load(file_fd)
output_json_data = json.dumps(test_result,
sort_keys=True,
indent=4)
@@ -239,8 +242,6 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
@staticmethod
def get_test_scenario(file_path):
- test_scenario_file = open(file_path,
- 'r')
- test_scenario_yaml = yaml.safe_load(test_scenario_file)
- test_scenario_file.close()
+ with open(file_path, "r", encoding="utf-8") as test_scenario_file:
+ test_scenario_yaml = yaml.safe_load(test_scenario_file)
return test_scenario_yaml["test_scenario_list"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
index 0969eab3b..269f6526b 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
@@ -43,7 +43,7 @@ class SshClient(): # pylint: disable=too-many-instance-attributes
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.util = Utilvnf()
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
index b159ddda4..2210b3909 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
@@ -36,7 +36,7 @@ class VmController():
self.util = Utilvnf()
self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -101,10 +101,8 @@ class VmController():
def command_create_and_execute(self, ssh, test_cmd_file_path,
cmd_input_param, prompt_file_path):
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
config_mode_prompt = prompt["config_mode"]
commands = self.command_gen_from_template(test_cmd_file_path,
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
index 7ed287c6e..46584456f 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
@@ -36,7 +36,7 @@ class VnfController():
self.util = Utilvnf()
self.vm_controller = VmController(util_info)
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -49,10 +49,9 @@ class VnfController():
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
parameter_file_path, prompt_file_path):
# pylint: disable=too-many-arguments
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"]
@@ -71,19 +70,16 @@ class VnfController():
res_dict_data_list = []
- parameter_file = open(parameter_file_path,
- 'r')
- cmd_input_param = yaml.safe_load(parameter_file)
- parameter_file.close()
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
+ cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
cmd_input_param["destination_ip"] = reference_vnf[
"data_plane_network_ip"]
- prompt_file = open(prompt_file_path,
- 'r')
- prompt = yaml.safe_load(prompt_file)
- prompt_file.close()
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
+ prompt = yaml.safe_load(prompt_file)
terminal_mode_prompt = prompt["terminal_mode"]
ssh = SshClient(target_vnf["floating_ip"],