aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--docker/core/Dockerfile6
-rw-r--r--functest/core/cloudify.py18
-rw-r--r--functest/core/singlevm.py59
-rw-r--r--functest/core/tenantnetwork.py29
-rw-r--r--functest/opnfv_tests/openstack/barbican/barbican.py2
-rw-r--r--functest/opnfv_tests/openstack/cinder/cinder_test.py8
-rw-r--r--functest/opnfv_tests/openstack/patrole/patrole.py2
-rw-r--r--functest/opnfv_tests/openstack/rally/rally.py64
-rw-r--r--functest/opnfv_tests/openstack/refstack/refstack.py18
-rw-r--r--functest/opnfv_tests/openstack/shaker/shaker.py34
-rw-r--r--functest/opnfv_tests/openstack/tempest/tempest.py71
-rw-r--r--functest/opnfv_tests/openstack/vmtp/vmtp.py38
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_ssh.py9
-rw-r--r--functest/opnfv_tests/openstack/vping/vping_userdata.py10
-rw-r--r--functest/opnfv_tests/sdn/odl/odl.py13
-rw-r--r--functest/opnfv_tests/vnf/epc/juju_epc.py58
-rw-r--r--functest/opnfv_tests/vnf/ims/clearwater.py29
-rw-r--r--functest/opnfv_tests/vnf/ims/cloudify_ims.py2
-rw-r--r--functest/opnfv_tests/vnf/ims/heat_ims.py7
-rw-r--r--functest/opnfv_tests/vnf/router/cloudify_vrouter.py2
-rw-r--r--functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py4
-rw-r--r--functest/opnfv_tests/vnf/router/utilvnf.py24
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py2
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py4
-rw-r--r--functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py10
-rw-r--r--functest/tests/unit/odl/test_odl.py49
-rw-r--r--functest/tests/unit/openstack/cinder/test_cinder.py11
-rw-r--r--functest/tests/unit/openstack/rally/test_rally.py22
-rw-r--r--functest/tests/unit/openstack/tempest/test_tempest.py4
-rw-r--r--functest/tests/unit/openstack/vmtp/test_vmtp.py10
-rw-r--r--functest/tests/unit/openstack/vping/test_vping_ssh.py8
-rw-r--r--functest/tests/unit/utils/test_functest_utils.py16
-rw-r--r--functest/utils/config.py11
-rw-r--r--functest/utils/functest_utils.py17
-rw-r--r--test-requirements.txt1
-rw-r--r--tox.ini7
-rw-r--r--upper-constraints.txt19
38 files changed, 344 insertions, 356 deletions
diff --git a/.travis.yml b/.travis.yml
index 16c5c457f..ba6310af5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,7 +18,7 @@ jobs:
- stage: run unit tests
script: >
tox -e \
- docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py38,cover
+ docs,pep8,pylint,yamllint,bashate,bandit,py38,cover
- stage: build functest-core images
script: sudo -E bash build.sh
env:
diff --git a/docker/core/Dockerfile b/docker/core/Dockerfile
index d955920f9..5bfe9a661 100644
--- a/docker/core/Dockerfile
+++ b/docker/core/Dockerfile
@@ -8,10 +8,12 @@ COPY Create-new-server-in-test_create_backup.patch /tmp/Create-new-server-in-tes
RUN apk -U upgrade && \
apk --no-cache add --update \
python3 py3-wheel libffi openssl libjpeg-turbo py3-pip bash \
- grep sed wget ca-certificates git openssh-client qemu-img iputils coreutils mailcap libstdc++ && \
+ grep sed wget ca-certificates git openssh-client qemu-img iputils coreutils mailcap libstdc++ \
+ libxml2 libxslt && \
apk --no-cache add --virtual .build-deps --update \
python3-dev build-base linux-headers libffi-dev \
- openssl-dev libjpeg-turbo-dev rust cargo && \
+ openssl-dev libjpeg-turbo-dev rust cargo \
+ libxml2-dev libxslt-dev && \
apk --no-cache add --update py3-distlib\>=0.3.1 \
--repository=http://dl-cdn.alpinelinux.org/alpine/edge/main && \
git init /src/requirements && \
diff --git a/functest/core/cloudify.py b/functest/core/cloudify.py
index 58e3095d6..416fe523e 100644
--- a/functest/core/cloudify.py
+++ b/functest/core/cloudify.py
@@ -67,13 +67,12 @@ class Cloudify(singlevm.SingleVm2):
"sudo wget https://get.docker.com/ -O script.sh && "
"sudo chmod +x script.sh && "
"sudo ./script.sh && "
- "sudo docker load -i ~/{} && "
+ "sudo docker load -i "
+ f"~/{os.path.basename(self.cloudify_archive)} && "
"sudo docker run --name cfy_manager_local -d "
"--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro "
"--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined "
- "--cap-add SYS_ADMIN --network=host {}".format(
- os.path.basename(self.cloudify_archive),
- self.cloudify_container))
+ f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}")
self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
self.cfy_client = CloudifyClient(
@@ -132,8 +131,8 @@ class Cloudify(singlevm.SingleVm2):
"""Upload Cloudify plugins"""
(_, stdout, stderr) = self.ssh.exec_command(
"sudo docker exec cfy_manager_local "
- "cfy plugins upload -y {} {} && "
- "sudo docker exec cfy_manager_local cfy status".format(yaml, wgn))
+ f"cfy plugins upload -y {yaml} {wgn} && "
+ "sudo docker exec cfy_manager_local cfy status")
self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
@@ -189,9 +188,8 @@ def wait_for_execution(client, execution, logger, timeout=3600, ):
if timeout is not None:
if time.time() > deadline:
raise RuntimeError(
- 'execution of operation {0} for deployment {1} '
- 'timed out'.format(execution.workflow_id,
- execution.deployment_id))
+ 'execution of operation {execution.workflow_id} for '
+ 'deployment {execution.deployment_id} timed out')
# update the remaining timeout
timeout = deadline - time.time()
@@ -219,4 +217,4 @@ def get_execution_id(client, deployment_id):
return execution
raise RuntimeError('Failed to get create_deployment_environment '
'workflow execution.'
- 'Available executions: {0}'.format(executions))
+ f'Available executions: {executions}')
diff --git a/functest/core/singlevm.py b/functest/core/singlevm.py
index b2d855b30..7bb4e16c0 100644
--- a/functest/core/singlevm.py
+++ b/functest/core/singlevm.py
@@ -80,19 +80,18 @@ class VmReady1(tenantnetwork.TenantNetwork1):
functest_utils.convert_ini_to_dict(
env.get('IMAGE_PROPERTIES')))
extra_properties.update(
- getattr(config.CONF, '{}_extra_properties'.format(
- self.case_name), {}))
+ getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
image = self.cloud.create_image(
- name if name else '{}-img_{}'.format(self.case_name, self.guid),
+ name if name else f'{self.case_name}-img_{self.guid}',
filename=getattr(
- config.CONF, '{}_image'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image',
self.filename),
meta=extra_properties,
disk_format=getattr(
- config.CONF, '{}_image_format'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_format',
self.image_format),
visibility=getattr(
- config.CONF, '{}_visibility'.format(self.case_name),
+ config.CONF, f'{self.case_name}_visibility',
self.visibility),
wait=True)
self.__logger.debug("image: %s", image)
@@ -115,20 +114,18 @@ class VmReady1(tenantnetwork.TenantNetwork1):
functest_utils.convert_ini_to_dict(
env.get('IMAGE_PROPERTIES')))
extra_alt_properties.update(
- getattr(config.CONF, '{}_extra_alt_properties'.format(
- self.case_name), {}))
+ getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {}))
image = self.cloud.create_image(
- name if name else '{}-img_alt_{}'.format(
- self.case_name, self.guid),
+ name if name else f'{self.case_name}-img_alt_{self.guid}',
filename=getattr(
- config.CONF, '{}_image_alt'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_alt',
self.filename_alt),
meta=extra_alt_properties,
disk_format=getattr(
- config.CONF, '{}_image_alt_format'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_alt_format',
self.image_format),
visibility=getattr(
- config.CONF, '{}_visibility'.format(self.case_name),
+ config.CONF, f'{self.case_name}_visibility',
self.visibility),
wait=True)
self.__logger.debug("image: %s", image)
@@ -146,12 +143,12 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.orig_cloud
flavor = self.orig_cloud.create_flavor(
- name if name else '{}-flavor_{}'.format(self.case_name, self.guid),
- getattr(config.CONF, '{}_flavor_ram'.format(self.case_name),
+ name if name else f'{self.case_name}-flavor_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_ram',
self.flavor_ram),
- getattr(config.CONF, '{}_flavor_vcpus'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_vcpus',
self.flavor_vcpus),
- getattr(config.CONF, '{}_flavor_disk'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_disk',
self.flavor_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_extra_specs = self.flavor_extra_specs.copy()
@@ -161,7 +158,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
env.get('FLAVOR_EXTRA_SPECS')))
flavor_extra_specs.update(
getattr(config.CONF,
- '{}_flavor_extra_specs'.format(self.case_name), {}))
+ f'{self.case_name}_flavor_extra_specs', {}))
self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs)
return flavor
@@ -177,13 +174,12 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.orig_cloud
flavor = self.orig_cloud.create_flavor(
- name if name else '{}-flavor_alt_{}'.format(
- self.case_name, self.guid),
- getattr(config.CONF, '{}_flavor_alt_ram'.format(self.case_name),
+ name if name else f'{self.case_name}-flavor_alt_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_ram',
self.flavor_alt_ram),
- getattr(config.CONF, '{}_flavor_alt_vcpus'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus',
self.flavor_alt_vcpus),
- getattr(config.CONF, '{}_flavor_alt_disk'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_disk',
self.flavor_alt_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
@@ -193,7 +189,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
env.get('FLAVOR_EXTRA_SPECS')))
flavor_alt_extra_specs.update(
getattr(config.CONF,
- '{}_flavor_alt_extra_specs'.format(self.case_name), {}))
+ f'{self.case_name}_flavor_alt_extra_specs', {}))
self.orig_cloud.set_flavor_specs(
flavor.id, flavor_alt_extra_specs)
return flavor
@@ -210,7 +206,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.cloud
vm1 = self.cloud.create_server(
- name if name else '{}-vm_{}'.format(self.case_name, self.guid),
+ name if name else f'{self.case_name}-vm_{self.guid}',
image=self.image.id, flavor=self.flavor.id,
auto_ip=False,
network=self.network.id if self.network else env.get(
@@ -393,14 +389,15 @@ class SingleVm1(VmReady1):
"""
assert self.cloud
self.keypair = self.cloud.create_keypair(
- '{}-kp_{}'.format(self.case_name, self.guid))
+ f'{self.case_name}-kp_{self.guid}')
self.__logger.debug("keypair: %s", self.keypair)
self.__logger.debug("private_key:\n%s", self.keypair.private_key)
- with open(self.key_filename, 'w') as private_key_file:
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
private_key_file.write(self.keypair.private_key)
self.sec = self.cloud.create_security_group(
- '{}-sg_{}'.format(self.case_name, self.guid),
- 'created by OPNFV Functest ({})'.format(self.case_name))
+ f'{self.case_name}-sg_{self.guid}',
+ f'created by OPNFV Functest ({self.case_name})')
self.cloud.create_security_group_rule(
self.sec.id, port_range_min='22', port_range_max='22',
protocol='tcp', direction='ingress')
@@ -434,11 +431,11 @@ class SingleVm1(VmReady1):
fip.floating_ip_address if fip else vm1.public_v4,
username=getattr(
config.CONF,
- '{}_image_user'.format(self.case_name), self.username),
+ f'{self.case_name}_image_user', self.username),
key_filename=self.key_filename,
timeout=getattr(
config.CONF,
- '{}_vm_ssh_connect_timeout'.format(self.case_name),
+ f'{self.case_name}_vm_ssh_connect_timeout',
self.ssh_connect_timeout))
break
except Exception as exc: # pylint: disable=broad-except
diff --git a/functest/core/tenantnetwork.py b/functest/core/tenantnetwork.py
index f8e4dddc7..3670dbe8a 100644
--- a/functest/core/tenantnetwork.py
+++ b/functest/core/tenantnetwork.py
@@ -61,13 +61,12 @@ class NewProject():
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
self.project = self.orig_cloud.create_project(
- name='{}-project_{}'.format(self.case_name[:18], self.guid),
- description="Created by OPNFV Functest: {}".format(
- self.case_name),
+ name=f'{self.case_name[:18]}-project_{self.guid}',
+ description=f"Created by OPNFV Functest: {self.case_name}",
domain_id=self.domain.id)
self.__logger.debug("project: %s", self.project)
self.user = self.orig_cloud.create_user(
- name='{}-user_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-user_{self.guid}',
password=self.password,
domain_id=self.domain.id)
self.__logger.debug("user: %s", self.user)
@@ -77,7 +76,7 @@ class NewProject():
elif self.orig_cloud.get_role(self.default_member.lower()):
self.role_name = self.default_member.lower()
else:
- raise Exception("Cannot detect {}".format(self.default_member))
+ raise Exception(f"Cannot detect {self.default_member}")
except Exception: # pylint: disable=broad-except
self.__logger.info("Creating default role %s", self.default_member)
role = self.orig_cloud.create_role(self.default_member)
@@ -222,15 +221,15 @@ class TenantNetwork1(testcase.TestCase):
if env.get('NO_TENANT_NETWORK').lower() != 'true':
assert self.ext_net
provider = {}
- if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
+ if hasattr(config.CONF, f'{self.case_name}_network_type'):
provider["network_type"] = getattr(
- config.CONF, '{}_network_type'.format(self.case_name))
- if hasattr(config.CONF, '{}_physical_network'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_network_type')
+ if hasattr(config.CONF, f'{self.case_name}_physical_network'):
provider["physical_network"] = getattr(
- config.CONF, '{}_physical_network'.format(self.case_name))
- if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_physical_network')
+ if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
provider["segmentation_id"] = getattr(
- config.CONF, '{}_segmentation_id'.format(self.case_name))
+ config.CONF, f'{self.case_name}_segmentation_id')
domain = self.orig_cloud.get_domain(
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
@@ -238,23 +237,23 @@ class TenantNetwork1(testcase.TestCase):
self.cloud.auth['project_name'],
domain_id=domain.id)
self.network = self.orig_cloud.create_network(
- '{}-net_{}'.format(self.case_name, self.guid),
+ f'{self.case_name}-net_{self.guid}',
provider=provider, project_id=project.id,
shared=self.shared_network)
self.__logger.debug("network: %s", self.network)
self.subnet = self.cloud.create_subnet(
self.network.id,
- subnet_name='{}-subnet_{}'.format(self.case_name, self.guid),
+ subnet_name=f'{self.case_name}-subnet_{self.guid}',
cidr=getattr(
- config.CONF, '{}_private_subnet_cidr'.format(self.case_name),
+ config.CONF, f'{self.case_name}_private_subnet_cidr',
self.cidr),
enable_dhcp=True,
dns_nameservers=[env.get('NAMESERVER')])
self.__logger.debug("subnet: %s", self.subnet)
self.router = self.cloud.create_router(
- name='{}-router_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-router_{self.guid}',
ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
self.__logger.debug("router: %s", self.router)
self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
diff --git a/functest/opnfv_tests/openstack/barbican/barbican.py b/functest/opnfv_tests/openstack/barbican/barbican.py
index 0c2429e10..706304bbf 100644
--- a/functest/opnfv_tests/openstack/barbican/barbican.py
+++ b/functest/opnfv_tests/openstack/barbican/barbican.py
@@ -32,6 +32,6 @@ class Barbican(tempest.TempestCommon):
if not rconfig.has_section('image-feature-enabled'):
rconfig.add_section('image-feature-enabled')
rconfig.set('image-feature-enabled', 'api_v1', False)
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
self.backup_tempest_config(self.conf_file, self.res_dir)
diff --git a/functest/opnfv_tests/openstack/cinder/cinder_test.py b/functest/opnfv_tests/openstack/cinder/cinder_test.py
index 47bf41b8d..7d8c0a0bd 100644
--- a/functest/opnfv_tests/openstack/cinder/cinder_test.py
+++ b/functest/opnfv_tests/openstack/cinder/cinder_test.py
@@ -54,12 +54,12 @@ class CinderCheck(singlevm.SingleVm2):
def prepare(self):
super().prepare()
self.vm2 = self.boot_vm(
- '{}-vm2_{}'.format(self.case_name, self.guid),
+ f'{self.case_name}-vm2_{self.guid}',
key_name=self.keypair.id,
security_groups=[self.sec.id])
(self.fip2, self.ssh2) = self.connect(self.vm2)
self.volume = self.cloud.create_volume(
- name='{}-volume_{}'.format(self.case_name, self.guid), size='2',
+ name=f'{self.case_name}-volume_{self.guid}', size='2',
timeout=self.volume_timeout, wait=True)
def _write_data(self):
@@ -76,7 +76,7 @@ class CinderCheck(singlevm.SingleVm2):
return testcase.TestCase.EX_RUN_ERROR
self.logger.debug("ssh: %s", self.ssh)
(_, stdout, stderr) = self.ssh.exec_command(
- "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+ f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}")
self.logger.debug(
"volume_write stdout: %s", stdout.read().decode("utf-8"))
self.logger.debug(
@@ -104,7 +104,7 @@ class CinderCheck(singlevm.SingleVm2):
return testcase.TestCase.EX_RUN_ERROR
self.logger.debug("ssh: %s", self.ssh2)
(_, stdout, stderr) = self.ssh2.exec_command(
- "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+ f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}")
self.logger.debug(
"read volume stdout: %s", stdout.read().decode("utf-8"))
self.logger.debug(
diff --git a/functest/opnfv_tests/openstack/patrole/patrole.py b/functest/opnfv_tests/openstack/patrole/patrole.py
index bdf18d35e..88c42f269 100644
--- a/functest/opnfv_tests/openstack/patrole/patrole.py
+++ b/functest/opnfv_tests/openstack/patrole/patrole.py
@@ -23,6 +23,6 @@ class Patrole(tempest.TempestCommon):
if not rconfig.has_section('rbac'):
rconfig.add_section('rbac')
rconfig.set('rbac', 'rbac_test_roles', kwargs.get('roles', 'admin'))
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
self.backup_tempest_config(self.conf_file, self.res_dir)
diff --git a/functest/opnfv_tests/openstack/rally/rally.py b/functest/opnfv_tests/openstack/rally/rally.py
index 36f377ccf..3d897e25d 100644
--- a/functest/opnfv_tests/openstack/rally/rally.py
+++ b/functest/opnfv_tests/openstack/rally/rally.py
@@ -145,7 +145,7 @@ class RallyBase(singlevm.VmReady2):
def _prepare_test_list(self, test_name):
"""Build the list of test cases to be executed."""
- test_yaml_file_name = 'opnfv-{}.yaml'.format(test_name)
+ test_yaml_file_name = f'opnfv-{test_name}.yaml'
scenario_file_name = os.path.join(self.rally_scenario_dir,
test_yaml_file_name)
@@ -154,8 +154,8 @@ class RallyBase(singlevm.VmReady2):
test_yaml_file_name)
if not os.path.exists(scenario_file_name):
- raise Exception("The scenario '%s' does not exist."
- % scenario_file_name)
+ raise Exception(
+ f"The scenario '{scenario_file_name}' does not exist.")
LOGGER.debug('Scenario fetched from : %s', scenario_file_name)
test_file_name = os.path.join(self.temp_dir, test_yaml_file_name)
@@ -190,7 +190,9 @@ class RallyBase(singlevm.VmReady2):
if pod_arch and pod_arch in arch_filter:
LOGGER.info("Apply aarch64 specific to rally config...")
- with open(RallyBase.rally_aar4_patch_path, "r") as pfile:
+ with open(
+ RallyBase.rally_aar4_patch_path, "r",
+ encoding='utf-8') as pfile:
rally_patch_conf = pfile.read()
for line in fileinput.input(RallyBase.rally_conf_path):
@@ -228,7 +230,7 @@ class RallyBase(singlevm.VmReady2):
rconfig.add_section('openstack')
rconfig.set(
'openstack', 'keystone_default_role', env.get("NEW_USER_ROLE"))
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
@staticmethod
@@ -239,7 +241,7 @@ class RallyBase(singlevm.VmReady2):
rconfig.read(rally_conf)
if rconfig.has_option('openstack', 'keystone_default_role'):
rconfig.remove_option('openstack', 'keystone_default_role')
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
@staticmethod
@@ -291,7 +293,9 @@ class RallyBase(singlevm.VmReady2):
"""Exclude scenario."""
black_tests = []
try:
- with open(RallyBase.blacklist_file, 'r') as black_list_file:
+ with open(
+ RallyBase.blacklist_file, 'r',
+ encoding='utf-8') as black_list_file:
black_list_yaml = yaml.safe_load(black_list_file)
deploy_scenario = env.get('DEPLOY_SCENARIO')
@@ -335,7 +339,9 @@ class RallyBase(singlevm.VmReady2):
func_list = []
try:
- with open(RallyBase.blacklist_file, 'r') as black_list_file:
+ with open(
+ RallyBase.blacklist_file, 'r',
+ encoding='utf-8') as black_list_file:
black_list_yaml = yaml.safe_load(black_list_file)
if env.get('BLOCK_MIGRATION').lower() == 'true':
@@ -362,8 +368,8 @@ class RallyBase(singlevm.VmReady2):
def apply_blacklist(self, case_file_name, result_file_name):
"""Apply blacklist."""
LOGGER.debug("Applying blacklist...")
- with open(case_file_name, 'r') as cases_file, open(
- result_file_name, 'w') as result_file:
+ with open(case_file_name, 'r', encoding='utf-8') as cases_file, open(
+ result_file_name, 'w', encoding='utf-8') as result_file:
black_tests = list(set(self.excl_func() + self.excl_scenario()))
if black_tests:
LOGGER.debug("Blacklisted tests: %s", str(black_tests))
@@ -408,7 +414,7 @@ class RallyBase(singlevm.VmReady2):
LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
# save report as JSON
- report_json_name = '{}.json'.format(test_name)
+ report_json_name = f'{test_name}.json'
report_json_dir = os.path.join(self.results_dir, report_json_name)
cmd = (["rally", "task", "report", "--json", "--uuid", task_id,
"--out", report_json_dir])
@@ -416,7 +422,7 @@ class RallyBase(singlevm.VmReady2):
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOGGER.info("%s\n%s", " ".join(cmd), output.decode("utf-8"))
- with open(report_json_dir) as json_file:
+ with open(report_json_dir, encoding='utf-8') as json_file:
json_results = json_file.read()
self._append_summary(json_results, test_name)
@@ -492,7 +498,7 @@ class RallyBase(singlevm.VmReady2):
if test in self.stests:
self.tests.append(test)
else:
- raise Exception("Test name '%s' is invalid" % test)
+ raise Exception(f"Test name '{test}' is invalid")
if not os.path.exists(self.task_dir):
os.makedirs(self.task_dir)
@@ -500,16 +506,14 @@ class RallyBase(singlevm.VmReady2):
task = os.path.join(self.rally_dir, 'task.yaml')
if not os.path.exists(task):
LOGGER.error("Task file '%s' does not exist.", task)
- raise Exception("Task file '{}' does not exist.".
- format(task))
+ raise Exception(f"Task file '{task}' does not exist.")
self.task_file = os.path.join(self.task_dir, 'task.yaml')
shutil.copyfile(task, self.task_file)
task_macro = os.path.join(self.rally_dir, 'macro')
if not os.path.exists(task_macro):
LOGGER.error("Task macro dir '%s' does not exist.", task_macro)
- raise Exception("Task macro dir '{}' does not exist.".
- format(task_macro))
+ raise Exception(f"Task macro dir '{task_macro}' does not exist.")
macro_dir = os.path.join(self.task_dir, 'macro')
if os.path.exists(macro_dir):
shutil.rmtree(macro_dir)
@@ -570,7 +574,7 @@ class RallyBase(singlevm.VmReady2):
success_avg = 100 * item['nb_success'] / item['nb_tests']
except ZeroDivisionError:
success_avg = 0
- success_str = str("{:0.2f}".format(success_avg)) + '%'
+ success_str = f"{success_avg:0.2f}%"
duration_str = time.strftime("%H:%M:%S",
time.gmtime(item['overall_duration']))
res_table.add_row([item['test_name'], duration_str,
@@ -588,7 +592,7 @@ class RallyBase(singlevm.VmReady2):
self.result = 100 * total_nb_success / total_nb_tests
except ZeroDivisionError:
self.result = 100
- success_rate = "{:0.2f}".format(self.result)
+ success_rate = f"{self.result:0.2f}"
success_rate_str = str(success_rate) + '%'
res_table.add_row(["", "", "", ""])
res_table.add_row(["TOTAL:", total_duration_str, total_nb_tests,
@@ -664,7 +668,7 @@ class RallyBase(singlevm.VmReady2):
rconfig.set('DEFAULT', 'use_stderr', False)
rconfig.set('DEFAULT', 'log-file', 'rally.log')
rconfig.set('DEFAULT', 'log_dir', res_dir)
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
@staticmethod
@@ -680,7 +684,7 @@ class RallyBase(singlevm.VmReady2):
rconfig.remove_option('DEFAULT', 'log-file')
if rconfig.has_option('DEFAULT', 'log_dir'):
rconfig.remove_option('DEFAULT', 'log_dir')
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def run(self, **kwargs):
@@ -695,9 +699,9 @@ class RallyBase(singlevm.VmReady2):
self.run_tests(**kwargs)
self._generate_report()
self.export_task(
- "{}/{}.html".format(self.results_dir, self.case_name))
+ f"{self.results_dir}/{self.case_name}.html")
self.export_task(
- "{}/{}.xml".format(self.results_dir, self.case_name),
+ f"{self.results_dir}/{self.case_name}.xml",
export_type="junit-xml")
res = testcase.TestCase.EX_OK
except Exception: # pylint: disable=broad-except
@@ -751,14 +755,14 @@ class RallyJobs(RallyBase):
def prepare_run(self, **kwargs):
"""Create resources needed by test scenarios."""
super().prepare_run(**kwargs)
- with open(os.path.join(self.rally_dir,
- 'rally_jobs.yaml'), 'r') as task_file:
+ with open(
+ os.path.join(self.rally_dir, 'rally_jobs.yaml'),
+ 'r', encoding='utf-8') as task_file:
self.task_yaml = yaml.safe_load(task_file)
for task in self.task_yaml:
if task not in self.tests:
- raise Exception("Test '%s' not in '%s'" %
- (task, self.tests))
+ raise Exception(f"Test '{task}' not in '{self.tests}'")
def apply_blacklist(self, case_file_name, result_file_name):
# pylint: disable=too-many-branches
@@ -770,7 +774,7 @@ class RallyJobs(RallyBase):
LOGGER.debug("Blacklisted tests: %s", str(black_tests))
template = YAML(typ='jinja2')
- with open(case_file_name, 'r') as fname:
+ with open(case_file_name, 'r', encoding='utf-8') as fname:
cases = template.load(fname)
if cases.get("version", 1) == 1:
# scenarios in dictionary
@@ -800,7 +804,7 @@ class RallyJobs(RallyBase):
cases['subtasks'].pop(sind)
break
- with open(result_file_name, 'w') as fname:
+ with open(result_file_name, 'w', encoding='utf-8') as fname:
template.dump(cases, fname)
def build_task_args(self, test_name):
@@ -821,7 +825,7 @@ class RallyJobs(RallyBase):
task_name = self.task_yaml.get(test_name).get("task")
task = os.path.join(jobs_dir, task_name)
if not os.path.exists(task):
- raise Exception("The scenario '%s' does not exist." % task)
+ raise Exception(f"The scenario '{task}' does not exist.")
LOGGER.debug('Scenario fetched from : %s', task)
if not os.path.exists(self.temp_dir):
diff --git a/functest/opnfv_tests/openstack/refstack/refstack.py b/functest/opnfv_tests/openstack/refstack/refstack.py
index faf183f76..87932020b 100644
--- a/functest/opnfv_tests/openstack/refstack/refstack.py
+++ b/functest/opnfv_tests/openstack/refstack/refstack.py
@@ -26,12 +26,11 @@ class Refstack(tempest.TempestCommon):
def _extract_refstack_data(self, refstack_list):
yaml_data = ""
- with open(refstack_list) as def_file:
+ with open(refstack_list, encoding='utf-8') as def_file:
for line in def_file:
try:
grp = re.search(r'^([^\[]*)(\[.*\])\n*$', line)
- yaml_data = "{}\n{}: {}".format(
- yaml_data, grp.group(1), grp.group(2))
+ yaml_data = f"{yaml_data}\n{grp.group(1)}: {grp.group(2)}"
except Exception: # pylint: disable=broad-except
self.__logger.warning("Cannot parse %s", line)
return yaml.full_load(yaml_data)
@@ -53,8 +52,7 @@ class Refstack(tempest.TempestCommon):
for line in output.splitlines():
try:
grp = re.search(r'^([^\[]*)(\[.*\])\n*$', line.decode("utf-8"))
- yaml_data2 = "{}\n{}: {}".format(
- yaml_data2, grp.group(1), grp.group(2))
+ yaml_data2 = f"{yaml_data2}\n{grp.group(1)}: {grp.group(2)}"
except Exception: # pylint: disable=broad-except
self.__logger.warning("Cannot parse %s. skipping it", line)
return yaml.full_load(yaml_data2)
@@ -62,11 +60,11 @@ class Refstack(tempest.TempestCommon):
def generate_test_list(self, **kwargs):
refstack_list = os.path.join(
getattr(config.CONF, 'dir_refstack_data'),
- "{}.txt".format(kwargs.get('target', 'compute')))
+ f"{kwargs.get('target', 'compute')}.txt")
self.backup_tempest_config(self.conf_file, '/etc')
refstack_data = self._extract_refstack_data(refstack_list)
tempest_data = self._extract_tempest_data()
- with open(self.list, 'w') as ref_file:
+ with open(self.list, 'w', encoding='utf-8') as ref_file:
for key in refstack_data.keys():
try:
for data in tempest_data[key]:
@@ -75,9 +73,9 @@ class Refstack(tempest.TempestCommon):
else:
self.__logger.info("%s: ids differ. skipping it", key)
continue
- ref_file.write("{}{}\n".format(
- key, str(tempest_data[key]).replace(
- "'", "").replace(", ", ",")))
+ value = str(tempest_data[key]).replace(
+ "'", "").replace(", ", ",")
+ ref_file.write(f"{key}{value}\n")
except Exception: # pylint: disable=broad-except
self.__logger.info("%s: not found. skipping it", key)
continue
diff --git a/functest/opnfv_tests/openstack/shaker/shaker.py b/functest/opnfv_tests/openstack/shaker/shaker.py
index 7d9922060..275cc3077 100644
--- a/functest/opnfv_tests/openstack/shaker/shaker.py
+++ b/functest/opnfv_tests/openstack/shaker/shaker.py
@@ -95,33 +95,31 @@ class Shaker(singlevm.SingleVm2):
scpc.put('/home/opnfv/functest/conf/env_file', remote_path='~/')
if os.environ.get('OS_CACERT'):
scpc.put(os.environ.get('OS_CACERT'), remote_path='~/os_cacert')
+ opt = 'export OS_CACERT=~/os_cacert && ' if os.environ.get(
+ 'OS_CACERT') else ''
(_, stdout, stderr) = self.ssh.exec_command(
'source ~/env_file && '
'export OS_INTERFACE=public && '
- 'export OS_AUTH_URL={} && '
- 'export OS_USERNAME={} && '
- 'export OS_PROJECT_NAME={} && '
- 'export OS_PROJECT_ID={} && '
+ f'export OS_AUTH_URL={endpoint} && '
+ f'export OS_USERNAME={self.project.user.name} && '
+ f'export OS_PROJECT_NAME={self.project.project.name} && '
+ f'export OS_PROJECT_ID={self.project.project.id} && '
'unset OS_TENANT_NAME && '
'unset OS_TENANT_ID && '
'unset OS_ENDPOINT_TYPE && '
- 'export OS_PASSWORD="{}" && '
- '{}'
+ f'export OS_PASSWORD="{self.project.password}" && '
+ f'{opt}'
'env && '
- 'timeout {} shaker --debug --image-name {} --flavor-name {} '
- '--server-endpoint {}:9000 --external-net {} --dns-nameservers {} '
+ f'timeout {self.shaker_timeout} shaker --debug '
+ f'--image-name {self.image.name} --flavor-name {self.flavor.name} '
+ f'--server-endpoint {self.fip.floating_ip_address}:9000 '
+ f'--external-net {self.ext_net.id} '
+ f"--dns-nameservers {env.get('NAMESERVER')} "
'--scenario openstack/full_l2,'
'openstack/full_l3_east_west,'
'openstack/full_l3_north_south,'
'openstack/perf_l3_north_south '
- '--report report.html --output report.json'.format(
- endpoint, self.project.user.name, self.project.project.name,
- self.project.project.id, self.project.password,
- 'export OS_CACERT=~/os_cacert && ' if os.environ.get(
- 'OS_CACERT') else '',
- self.shaker_timeout, self.image.name, self.flavor.name,
- self.fip.floating_ip_address, self.ext_net.id,
- env.get('NAMESERVER')))
+ '--report report.html --output report.json')
self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.info("error:\n%s", stderr.read().decode("utf-8"))
if not os.path.exists(self.res_dir):
@@ -132,7 +130,9 @@ class Shaker(singlevm.SingleVm2):
except scp.SCPException:
self.__logger.exception("cannot get report files")
return 1
- with open(os.path.join(self.res_dir, 'report.json')) as json_file:
+ with open(
+ os.path.join(self.res_dir, 'report.json'),
+ encoding='utf-8') as json_file:
data = json.load(json_file)
for value in data["records"].values():
if value["status"] != "ok":
diff --git a/functest/opnfv_tests/openstack/tempest/tempest.py b/functest/opnfv_tests/openstack/tempest/tempest.py
index 60224019c..808ccba33 100644
--- a/functest/opnfv_tests/openstack/tempest/tempest.py
+++ b/functest/opnfv_tests/openstack/tempest/tempest.py
@@ -128,7 +128,7 @@ class TempestCommon(singlevm.VmReady2):
@staticmethod
def read_file(filename):
"""Read file and return content as a stripped list."""
- with open(filename) as src:
+ with open(filename, encoding='utf-8') as src:
return [line.strip() for line in src.readlines()]
@staticmethod
@@ -212,7 +212,7 @@ class TempestCommon(singlevm.VmReady2):
"""
return os.path.join(getattr(config.CONF, 'dir_rally_inst'),
'verification',
- 'verifier-{}'.format(verifier_id),
+ f'verifier-{verifier_id}',
'repo')
@staticmethod
@@ -222,13 +222,13 @@ class TempestCommon(singlevm.VmReady2):
"""
return os.path.join(getattr(config.CONF, 'dir_rally_inst'),
'verification',
- 'verifier-{}'.format(verifier_id),
- 'for-deployment-{}'.format(deployment_id))
+ f'verifier-{verifier_id}',
+ f'for-deployment-{deployment_id}')
@staticmethod
def update_tempest_conf_file(conf_file, rconfig):
"""Update defined paramters into tempest config file"""
- with open(TempestCommon.tempest_conf_yaml) as yfile:
+ with open(TempestCommon.tempest_conf_yaml, encoding='utf-8') as yfile:
conf_yaml = yaml.safe_load(yfile)
if conf_yaml:
sections = rconfig.sections()
@@ -239,7 +239,7 @@ class TempestCommon(singlevm.VmReady2):
for key, value in sub_conf.items():
rconfig.set(section, key, value)
- with open(conf_file, 'w') as config_file:
+ with open(conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
@staticmethod
@@ -324,13 +324,13 @@ class TempestCommon(singlevm.VmReady2):
shutil.copyfile(
self.tempest_custom, self.list)
else:
- raise Exception("Tempest test list file %s NOT found."
- % self.tempest_custom)
+ raise Exception(
+ f"Tempest test list file {self.tempest_custom} NOT found.")
else:
testr_mode = kwargs.get(
'mode', r'^tempest\.(api|scenario).*\[.*\bsmoke\b.*\]$')
- cmd = "(cd {0}; stestr list '{1}' >{2} 2>/dev/null)".format(
- self.verifier_repo_dir, testr_mode, self.list)
+ cmd = (f"(cd {self.verifier_repo_dir}; "
+ f"stestr list '{testr_mode}' > {self.list} 2>/dev/null)")
output = subprocess.check_output(cmd, shell=True)
LOGGER.info("%s\n%s", cmd, output.decode("utf-8"))
os.remove('/etc/tempest.conf')
@@ -342,13 +342,13 @@ class TempestCommon(singlevm.VmReady2):
os.remove(self.raw_list)
os.rename(self.list, self.raw_list)
cases_file = self.read_file(self.raw_list)
- with open(self.list, 'w') as result_file:
+ with open(self.list, 'w', encoding='utf-8') as result_file:
black_tests = []
try:
deploy_scenario = env.get('DEPLOY_SCENARIO')
if bool(deploy_scenario):
# if DEPLOY_SCENARIO is set we read the file
- with open(black_list) as black_list_file:
+ with open(black_list, encoding='utf-8') as black_list_file:
black_list_yaml = yaml.safe_load(black_list_file)
black_list_file.close()
for item in black_list_yaml:
@@ -376,12 +376,11 @@ class TempestCommon(singlevm.VmReady2):
LOGGER.info("Starting Tempest test suite: '%s'.", cmd)
with open(
- os.path.join(self.res_dir, "tempest.log"), 'w+') as f_stdout:
-
+ os.path.join(self.res_dir, "tempest.log"), 'w+',
+ encoding='utf-8') as f_stdout:
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1) as proc:
-
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
if re.search(r"\} tempest\.", line.decode("utf-8")):
@@ -399,8 +398,8 @@ class TempestCommon(singlevm.VmReady2):
LOGGER.info('Verification UUID: %s', self.verification_id)
shutil.copy(
- "{}/tempest.log".format(self.deployment_dir),
- "{}/tempest.debug.log".format(self.res_dir))
+ f"{self.deployment_dir}/tempest.log",
+ f"{self.res_dir}/tempest.debug.log")
def parse_verifier_result(self):
"""Parse and save test results."""
@@ -417,8 +416,8 @@ class TempestCommon(singlevm.VmReady2):
LOGGER.error("No test has been executed")
return
- with open(os.path.join(self.res_dir,
- "rally.log"), 'r') as logfile:
+ with open(os.path.join(self.res_dir, "rally.log"),
+ 'r', encoding='utf-8') as logfile:
output = logfile.read()
success_testcases = []
@@ -453,9 +452,8 @@ class TempestCommon(singlevm.VmReady2):
rconfig.read(rally_conf)
if not rconfig.has_section('openstack'):
rconfig.add_section('openstack')
- rconfig.set('openstack', 'img_name_regex', '^{}$'.format(
- self.image.name))
- with open(rally_conf, 'w') as config_file:
+ rconfig.set('openstack', 'img_name_regex', f'^{self.image.name}$')
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_default_role(self, rally_conf='/etc/rally/rally.conf'):
@@ -468,7 +466,7 @@ class TempestCommon(singlevm.VmReady2):
if not rconfig.has_section('openstack'):
rconfig.add_section('openstack')
rconfig.set('openstack', 'swift_operator_role', role.name)
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
@staticmethod
@@ -480,7 +478,7 @@ class TempestCommon(singlevm.VmReady2):
rconfig.remove_option('openstack', 'img_name_regex')
if rconfig.has_option('openstack', 'swift_operator_role'):
rconfig.remove_option('openstack', 'swift_operator_role')
- with open(rally_conf, 'w') as config_file:
+ with open(rally_conf, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_auth_section(self):
@@ -503,11 +501,11 @@ class TempestCommon(singlevm.VmReady2):
account_file = os.path.join(
getattr(config.CONF, 'dir_functest_data'), 'accounts.yaml')
assert os.path.exists(
- account_file), "{} doesn't exist".format(account_file)
+ account_file), f"{account_file} doesn't exist"
rconfig.set('auth', 'test_accounts_file', account_file)
if env.get('NO_TENANT_NETWORK').lower() == 'true':
rconfig.set('auth', 'create_isolated_networks', False)
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_network_section(self):
@@ -524,7 +522,7 @@ class TempestCommon(singlevm.VmReady2):
if not rconfig.has_section('network-feature-enabled'):
rconfig.add_section('network-feature-enabled')
rconfig.set('network-feature-enabled', 'floating_ips', False)
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_compute_section(self):
@@ -536,7 +534,7 @@ class TempestCommon(singlevm.VmReady2):
rconfig.set(
'compute', 'fixed_network_name',
self.network.name if self.network else env.get("EXTERNAL_NETWORK"))
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_validation_section(self):
@@ -551,7 +549,7 @@ class TempestCommon(singlevm.VmReady2):
rconfig.set(
'validation', 'network_for_ssh',
self.network.name if self.network else env.get("EXTERNAL_NETWORK"))
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_scenario_section(self):
@@ -559,12 +557,12 @@ class TempestCommon(singlevm.VmReady2):
rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
filename = getattr(
- config.CONF, '{}_image'.format(self.case_name), self.filename)
+ config.CONF, f'{self.case_name}_image', self.filename)
if not rconfig.has_section('scenario'):
rconfig.add_section('scenario')
rconfig.set('scenario', 'img_file', filename)
rconfig.set('scenario', 'img_disk_format', getattr(
- config.CONF, '{}_image_format'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_format',
self.image_format))
extra_properties = self.extra_properties.copy()
if env.get('IMAGE_PROPERTIES'):
@@ -572,12 +570,11 @@ class TempestCommon(singlevm.VmReady2):
functest_utils.convert_ini_to_dict(
env.get('IMAGE_PROPERTIES')))
extra_properties.update(
- getattr(config.CONF, '{}_extra_properties'.format(
- self.case_name), {}))
+ getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
rconfig.set(
'scenario', 'img_properties',
functest_utils.convert_dict_to_ini(extra_properties))
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def update_dashboard_section(self):
@@ -590,7 +587,7 @@ class TempestCommon(singlevm.VmReady2):
rconfig.set('dashboard', 'dashboard_url', env.get('DASHBOARD_URL'))
else:
rconfig.set('service_available', 'horizon', False)
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
def configure(self, **kwargs): # pylint: disable=unused-argument
@@ -706,7 +703,7 @@ class TempestHeat(TempestCommon):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.user2 = self.orig_cloud.create_user(
- name='{}-user2_{}'.format(self.case_name, self.project.guid),
+ name=f'{self.case_name}-user2_{self.project.guid}',
password=self.project.password,
domain_id=self.project.domain.id)
self.orig_cloud.grant_role(
@@ -764,7 +761,7 @@ class TempestHeat(TempestCommon):
env.get("EXTERNAL_NETWORK"))
rconfig.set(
'heat_plugin', 'network_for_ssh', env.get("EXTERNAL_NETWORK"))
- with open(self.conf_file, 'w') as config_file:
+ with open(self.conf_file, 'w', encoding='utf-8') as config_file:
rconfig.write(config_file)
self.backup_tempest_config(self.conf_file, self.res_dir)
diff --git a/functest/opnfv_tests/openstack/vmtp/vmtp.py b/functest/opnfv_tests/openstack/vmtp/vmtp.py
index b0e6ff427..9833cc72a 100644
--- a/functest/opnfv_tests/openstack/vmtp/vmtp.py
+++ b/functest/opnfv_tests/openstack/vmtp/vmtp.py
@@ -57,7 +57,7 @@ class Vmtp(singlevm.VmReady2):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmtp'
super().__init__(**kwargs)
- self.config = "{}/vmtp.conf".format(self.res_dir)
+ self.config = f"{self.res_dir}/vmtp.conf"
(_, self.privkey_filename) = tempfile.mkstemp()
(_, self.pubkey_filename) = tempfile.mkstemp()
@@ -77,7 +77,7 @@ class Vmtp(singlevm.VmReady2):
assert self.cloud
assert self.ext_net
self.router = self.cloud.create_router(
- name='{}-router_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-router_{self.guid}',
ext_gateway_net_id=self.ext_net.id)
self.__logger.debug("router: %s", self.router)
@@ -87,13 +87,13 @@ class Vmtp(singlevm.VmReady2):
Raises: Exception on error
"""
assert self.cloud
- name = "vmtp_{}".format(self.guid)
+ name = f"vmtp_{self.guid}"
self.__logger.info("Creating keypair with name: '%s'", name)
keypair = self.cloud.create_keypair(name)
self.__logger.debug("keypair: %s", keypair)
- with open(self.privkey_filename, 'w') as key_file:
+ with open(self.privkey_filename, 'w', encoding='utf-8') as key_file:
key_file.write(keypair.private_key)
- with open(self.pubkey_filename, 'w') as key_file:
+ with open(self.pubkey_filename, 'w', encoding='utf-8') as key_file:
key_file.write(keypair.public_key)
self.cloud.delete_keypair(keypair.id)
@@ -108,7 +108,7 @@ class Vmtp(singlevm.VmReady2):
cmd = ['vmtp', '-sc']
output = subprocess.check_output(cmd).decode("utf-8")
self.__logger.info("%s\n%s", " ".join(cmd), output)
- with open(self.config, "w+") as conf:
+ with open(self.config, "w+", encoding='utf-8') as conf:
vmtp_conf = yaml.full_load(output)
vmtp_conf["private_key_file"] = self.privkey_filename
vmtp_conf["public_key_file"] = self.pubkey_filename
@@ -116,12 +116,11 @@ class Vmtp(singlevm.VmReady2):
vmtp_conf["router_name"] = str(self.router.name)
vmtp_conf["flavor_type"] = str(self.flavor.name)
vmtp_conf["internal_network_name"] = [
- "pns-internal-net_{}".format(self.guid),
- "pns-internal-net2_{}".format(self.guid)]
- vmtp_conf["vm_name_client"] = "TestClient_{}".format(self.guid)
- vmtp_conf["vm_name_server"] = "TestServer_{}".format(self.guid)
- vmtp_conf["security_group_name"] = "pns-security{}".format(
- self.guid)
+ f"pns-internal-net_{self.guid}",
+ f"pns-internal-net2_{self.guid}"]
+ vmtp_conf["vm_name_client"] = f"TestClient_{self.guid}"
+ vmtp_conf["vm_name_server"] = f"TestServer_{self.guid}"
+ vmtp_conf["security_group_name"] = f"pns-security{self.guid}"
vmtp_conf["dns_nameservers"] = [env.get('NAMESERVER')]
vmtp_conf["generic_retry_count"] = self.create_server_timeout // 2
vmtp_conf["ssh_retry_count"] = self.ssh_retry_timeout // 2
@@ -143,13 +142,13 @@ class Vmtp(singlevm.VmReady2):
OS_USER_DOMAIN_NAME=self.project.domain.name,
OS_PASSWORD=self.project.password)
if not new_env["OS_AUTH_URL"].endswith(('v3', 'v3/')):
- new_env["OS_AUTH_URL"] = "{}/v3".format(new_env["OS_AUTH_URL"])
+ new_env["OS_AUTH_URL"] = f'{new_env["OS_AUTH_URL"]}/v3'
try:
del new_env['OS_TENANT_NAME']
del new_env['OS_TENANT_ID']
except Exception: # pylint: disable=broad-except
pass
- cmd = ['vmtp', '-d', '--json', '{}/vmtp.json'.format(self.res_dir),
+ cmd = ['vmtp', '-d', '--json', f'{self.res_dir}/vmtp.json',
'-c', self.config]
if env.get("VMTP_HYPERVISORS"):
hypervisors = functest_utils.convert_ini_to_list(
@@ -160,12 +159,13 @@ class Vmtp(singlevm.VmReady2):
output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT, env=new_env).decode("utf-8")
self.__logger.info("%s\n%s", " ".join(cmd), output)
- cmd = ['vmtp_genchart', '-c', '{}/vmtp.html'.format(self.res_dir),
- '{}/vmtp.json'.format(self.res_dir)]
+ cmd = ['vmtp_genchart', '-c', f'{self.res_dir}/vmtp.html',
+ f'{self.res_dir}/vmtp.json']
output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT).decode("utf-8")
self.__logger.info("%s\n%s", " ".join(cmd), output)
- with open('{}/vmtp.json'.format(self.res_dir), 'r') as res_file:
+ with open(f'{self.res_dir}/vmtp.json', 'r',
+ encoding='utf-8') as res_file:
self.details = json.load(res_file)
def run(self, **kwargs):
@@ -207,7 +207,7 @@ class Vmtp(singlevm.VmReady2):
super().clean()
os.remove(self.privkey_filename)
os.remove(self.pubkey_filename)
- self.cloud.delete_network("pns-internal-net_{}".format(self.guid))
- self.cloud.delete_network("pns-internal-net2_{}".format(self.guid))
+ self.cloud.delete_network(f"pns-internal-net_{self.guid}")
+ self.cloud.delete_network(f"pns-internal-net2_{self.guid}")
except Exception: # pylint: disable=broad-except
pass
diff --git a/functest/opnfv_tests/openstack/vping/vping_ssh.py b/functest/opnfv_tests/openstack/vping/vping_ssh.py
index 2eb040b0e..ad64348c4 100644
--- a/functest/opnfv_tests/openstack/vping/vping_ssh.py
+++ b/functest/opnfv_tests/openstack/vping/vping_ssh.py
@@ -35,7 +35,7 @@ class VPingSSH(singlevm.SingleVm2):
def prepare(self):
super().prepare()
self.vm2 = self.boot_vm(
- '{}-vm2_{}'.format(self.case_name, self.guid),
+ f'{self.case_name}-vm2_{self.guid}',
security_groups=[self.sec.id])
def execute(self):
@@ -46,10 +46,9 @@ class VPingSSH(singlevm.SingleVm2):
assert self.ssh
if not self.check_regex_in_console(self.vm2.name):
return 1
- (_, stdout, stderr) = self.ssh.exec_command(
- 'ping -c 1 {}'.format(
- self.vm2.private_v4 or self.vm2.addresses[
- self.network.name][0].addr))
+ ip4 = self.vm2.private_v4 or self.vm2.addresses[
+ self.network.name][0].addr
+ (_, stdout, stderr) = self.ssh.exec_command(f'ping -c 1 {ip4}')
self.__logger.info("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.info("error:\n%s", stderr.read().decode("utf-8"))
return stdout.channel.recv_exit_status()
diff --git a/functest/opnfv_tests/openstack/vping/vping_userdata.py b/functest/opnfv_tests/openstack/vping/vping_userdata.py
index 5b2308ec1..7de70f677 100644
--- a/functest/opnfv_tests/openstack/vping/vping_userdata.py
+++ b/functest/opnfv_tests/openstack/vping/vping_userdata.py
@@ -44,7 +44,7 @@ class VPingUserdata(singlevm.VmReady2):
self.result = 0
self.vm1 = self.boot_vm()
self.vm2 = self.boot_vm(
- '{}-vm2_{}'.format(self.case_name, self.guid),
+ f'{self.case_name}-vm2_{self.guid}',
userdata=self._get_userdata())
result = self._do_vping()
@@ -106,11 +106,13 @@ class VPingUserdata(singlevm.VmReady2):
:param test_ip: the IP value to substitute into the script
:return: the bash script contents
"""
+ ip4 = self.vm1.private_v4 or self.vm1.addresses[
+ self.network.name][0].addr
if self.vm1.private_v4 or self.vm1.addresses[
self.network.name][0].addr:
return ("#!/bin/sh\n\n"
"while true; do\n"
- " ping -c 1 %s 2>&1 >/dev/null\n"
+ f" ping -c 1 {ip4} 2>&1 >/dev/null\n"
" RES=$?\n"
" if [ \"Z$RES\" = \"Z0\" ] ; then\n"
" echo 'vPing OK'\n"
@@ -119,9 +121,7 @@ class VPingUserdata(singlevm.VmReady2):
" echo 'vPing KO'\n"
" fi\n"
" sleep 1\n"
- "done\n" % str(
- self.vm1.private_v4 or self.vm1.addresses[
- self.network.name][0].addr))
+ "done\n")
return None
def clean(self):
diff --git a/functest/opnfv_tests/sdn/odl/odl.py b/functest/opnfv_tests/sdn/odl/odl.py
index b8c56b1d3..72c38ce2c 100644
--- a/functest/opnfv_tests/sdn/odl/odl.py
+++ b/functest/opnfv_tests/sdn/odl/odl.py
@@ -66,10 +66,10 @@ class ODLTests(robotframework.RobotFramework):
try:
for line in fileinput.input(cls.odl_variables_file,
inplace=True):
- print(re.sub("@{AUTH}.*",
- "@{{AUTH}} {} {}".format(
- odlusername, odlpassword),
- line.rstrip()))
+ print(re.sub(
+ "@{AUTH}.*",
+ f"@{{AUTH}} {odlusername} {odlpassword}",
+ line.rstrip()))
return True
except Exception: # pylint: disable=broad-except
cls.__logger.exception("Cannot set ODL creds:")
@@ -111,9 +111,8 @@ class ODLTests(robotframework.RobotFramework):
odlusername = kwargs['odlusername']
odlpassword = kwargs['odlpassword']
osauthurl = kwargs['osauthurl']
- keystoneurl = "{}://{}".format(
- urllib.parse.urlparse(osauthurl).scheme,
- urllib.parse.urlparse(osauthurl).netloc)
+ keystoneurl = (f"{urllib.parse.urlparse(osauthurl).scheme}://"
+ f"{urllib.parse.urlparse(osauthurl).netloc}")
variable = ['KEYSTONEURL:' + keystoneurl,
'NEUTRONURL:' + kwargs['neutronurl'],
'OS_AUTH_URL:"' + osauthurl + '"',
diff --git a/functest/opnfv_tests/vnf/epc/juju_epc.py b/functest/opnfv_tests/vnf/epc/juju_epc.py
index f5557f592..1cf240b80 100644
--- a/functest/opnfv_tests/vnf/epc/juju_epc.py
+++ b/functest/opnfv_tests/vnf/epc/juju_epc.py
@@ -90,7 +90,7 @@ class JujuEpc(singlevm.SingleVm2):
'functest', 'opnfv_tests/vnf/epc')
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
+ config.CONF, f'vnf_{self.case_name}_config')
except Exception as exc:
raise Exception("VNF config file not found") from exc
self.config_file = os.path.join(self.case_dir, self.config)
@@ -138,7 +138,7 @@ class JujuEpc(singlevm.SingleVm2):
try:
self.public_auth_url = self.get_public_auth_url(self.orig_cloud)
if not self.public_auth_url.endswith(('v3', 'v3/')):
- self.public_auth_url = "{}/v3".format(self.public_auth_url)
+ self.public_auth_url = f"{self.public_auth_url}/v3"
except Exception: # pylint: disable=broad-except
self.public_auth_url = None
self.sec = None
@@ -168,7 +168,7 @@ class JujuEpc(singlevm.SingleVm2):
'url': self.public_auth_url,
'region': self.cloud.region_name if self.cloud.region_name else (
'RegionOne')}
- with open(clouds_yaml, 'w') as yfile:
+ with open(clouds_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CLOUD_TEMPLATE.format(**cloud_data))
scpc = scp.SCPClient(self.ssh.get_transport())
scpc.put(clouds_yaml, remote_path='~/')
@@ -189,7 +189,7 @@ class JujuEpc(singlevm.SingleVm2):
"project_domain_name", "Default"),
'user_domain_n': self.cloud.auth.get(
"user_domain_name", "Default")}
- with open(credentials_yaml, 'w') as yfile:
+ with open(credentials_yaml, 'w', encoding='utf-8') as yfile:
yfile.write(CREDS_TEMPLATE.format(**creds_data))
scpc = scp.SCPClient(self.ssh.get_transport())
scpc.put(credentials_yaml, remote_path='~/')
@@ -205,8 +205,8 @@ class JujuEpc(singlevm.SingleVm2):
'RegionOne')
(_, stdout, stderr) = self.ssh.exec_command(
'/snap/bin/juju metadata generate-image -d /home/ubuntu '
- '-i {} -s xenial -r {} -u {}'.format(
- self.image.id, region_name, self.public_auth_url))
+ f'-i {self.image.id} -s xenial -r {region_name} '
+ f'-u {self.public_auth_url}')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
return not stdout.channel.recv_exit_status()
@@ -217,8 +217,8 @@ class JujuEpc(singlevm.SingleVm2):
'RegionOne')
(_, stdout, stderr) = self.ssh.exec_command(
'/snap/bin/juju metadata generate-image -d /home/ubuntu '
- '-i {} -s trusty -r {} -u {}'.format(
- image_alt.id, region_name, self.public_auth_url))
+ f'-i {image_alt.id} -s trusty -r {region_name} '
+ f'-u {self.public_auth_url}')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
return image_alt
@@ -236,18 +236,16 @@ class JujuEpc(singlevm.SingleVm2):
region_name = self.cloud.region_name if self.cloud.region_name else (
'RegionOne')
(_, stdout, stderr) = self.ssh.exec_command(
- 'timeout {} '
- '/snap/bin/juju bootstrap abot-epc/{} abot-controller '
+ f'timeout {JujuEpc.juju_timeout} '
+ f'/snap/bin/juju bootstrap abot-epc/{region_name} abot-controller '
'--agent-version 2.3.9 --metadata-source /home/ubuntu '
'--constraints mem=2G --bootstrap-series xenial '
- '--config network={} '
+ f'--config network={self.network.id} '
'--config ssl-hostname-verification=false '
- '--config external-network={} '
+ f'--config external-network={self.ext_net.id} '
'--config use-floating-ip=true '
'--config use-default-secgroup=true '
- '--debug'.format(
- JujuEpc.juju_timeout, region_name, self.network.id,
- self.ext_net.id))
+ '--debug')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
return not stdout.channel.recv_exit_status()
@@ -256,14 +254,14 @@ class JujuEpc(singlevm.SingleVm2):
"""Check application status."""
for i in range(10):
(_, stdout, stderr) = self.ssh.exec_command(
- '/snap/bin/juju status --format short {}'.format(name))
+ f'/snap/bin/juju status --format short {name}')
output = stdout.read().decode("utf-8")
self.__logger.debug("stdout:\n%s", output)
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
if stdout.channel.recv_exit_status():
continue
ret = re.search(
- r'(?=workload:({})\))'.format(status), output)
+ rf'(?=workload:({status})\))', output)
if ret:
self.__logger.info("%s workload is %s", name, status)
break
@@ -295,7 +293,7 @@ class JujuEpc(singlevm.SingleVm2):
return not stdout.channel.recv_exit_status()
(_, stdout, stderr) = self.ssh.exec_command(
'PATH=/snap/bin/:$PATH '
- 'timeout {} juju-wait'.format(JujuEpc.juju_timeout))
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
if stdout.channel.recv_exit_status():
@@ -312,12 +310,11 @@ class JujuEpc(singlevm.SingleVm2):
return False
scpc = scp.SCPClient(self.ssh.get_transport())
scpc.put(
- '{}/featureFiles'.format(self.case_dir), remote_path='~/',
+ f'{self.case_dir}/featureFiles', remote_path='~/',
recursive=True)
(_, stdout, stderr) = self.ssh.exec_command(
- 'timeout {} /snap/bin/juju scp -- -r -v ~/featureFiles '
- 'abot-epc-basic/0:/etc/rebaca-test-suite/'.format(
- JujuEpc.juju_timeout))
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp -- -r -v '
+ '~/featureFiles abot-epc-basic/0:/etc/rebaca-test-suite/')
output = stdout.read().decode("utf-8")
self.__logger.debug("stdout:\n%s", output)
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
@@ -327,15 +324,15 @@ class JujuEpc(singlevm.SingleVm2):
"""Run test on ABoT."""
start_time = time.time()
(_, stdout, stderr) = self.ssh.exec_command(
- '/snap/bin/juju run-action abot-epc-basic/0 '
- 'run tagnames={}'.format(self.details['test_vnf']['tag_name']))
+ "/snap/bin/juju run-action abot-epc-basic/0 "
+ f"run tagnames={self.details['test_vnf']['tag_name']}")
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
if stdout.channel.recv_exit_status():
return not stdout.channel.recv_exit_status()
(_, stdout, stderr) = self.ssh.exec_command(
'PATH=/snap/bin/:$PATH '
- 'timeout {} juju-wait'.format(JujuEpc.juju_timeout))
+ f'timeout {JujuEpc.juju_timeout} juju-wait')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
if stdout.channel.recv_exit_status():
@@ -343,9 +340,9 @@ class JujuEpc(singlevm.SingleVm2):
duration = time.time() - start_time
self.__logger.info("Getting results from Abot node....")
(_, stdout, stderr) = self.ssh.exec_command(
- 'timeout {} /snap/bin/juju scp -- -v abot-epc-basic/0:'
- '/var/lib/abot-epc-basic/artifacts/TestResults.json .'.format(
- JujuEpc.juju_timeout))
+ f'timeout {JujuEpc.juju_timeout} /snap/bin/juju scp '
+ '-- -v abot-epc-basic/0:'
+ '/var/lib/abot-epc-basic/artifacts/TestResults.json .')
self.__logger.debug("stdout:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("stderr:\n%s", stderr.read().decode("utf-8"))
if stdout.channel.recv_exit_status():
@@ -353,8 +350,7 @@ class JujuEpc(singlevm.SingleVm2):
scpc = scp.SCPClient(self.ssh.get_transport())
scpc.get('TestResults.json', self.res_dir)
self.__logger.info("Parsing the Test results...")
- res = (process_abot_test_result('{}/TestResults.json'.format(
- self.res_dir)))
+ res = process_abot_test_result(f'{self.res_dir}/TestResults.json')
short_result = sig_test_format(res)
self.__logger.info(short_result)
self.details['test_vnf'].update(
@@ -433,7 +429,7 @@ def sig_test_format(sig_test):
def process_abot_test_result(file_path):
""" Process ABoT Result """
- with open(file_path) as test_result:
+ with open(file_path, encoding='utf-8') as test_result:
data = json.load(test_result)
res = []
for tests in data:
diff --git a/functest/opnfv_tests/vnf/ims/clearwater.py b/functest/opnfv_tests/vnf/ims/clearwater.py
index 67128b11c..bbd5291eb 100644
--- a/functest/opnfv_tests/vnf/ims/clearwater.py
+++ b/functest/opnfv_tests/vnf/ims/clearwater.py
@@ -50,7 +50,7 @@ class ClearwaterTesting():
output_dict = {}
self.logger.debug('Ellis IP: %s', self.ellis_ip)
output_dict['ellis_ip'] = self.ellis_ip
- account_url = 'http://{0}/accounts'.format(self.ellis_ip)
+ account_url = f'http://{self.ellis_ip}/accounts'
params = {"password": "functest",
"full_name": "opnfv functest user",
"email": "functest@opnfv.org",
@@ -60,7 +60,7 @@ class ClearwaterTesting():
number_res = self._create_ellis_account(account_url, params)
output_dict['number'] = number_res
- session_url = 'http://{0}/session'.format(self.ellis_ip)
+ session_url = f'http://{self.ellis_ip}/session'
session_data = {
'username': params['email'],
'password': params['password'],
@@ -68,8 +68,8 @@ class ClearwaterTesting():
}
cookies = self._get_ellis_session_cookies(session_url, session_data)
- number_url = 'http://{0}/accounts/{1}/numbers'.format(
- self.ellis_ip, params['email'])
+ number_url = (
+ f"http://{self.ellis_ip}/accounts/{params['email']}/numbers")
self.logger.debug('Create 1st calling number on Ellis')
number_res = self._create_ellis_number(number_url, cookies)
@@ -97,8 +97,7 @@ class ClearwaterTesting():
"try %s: cannot create ellis account", iloop + 1)
time.sleep(30)
raise Exception(
- "Unable to create an account {}".format(
- params.get('full_name')))
+ f"Unable to create an account {params.get('full_name')}")
def _get_ellis_session_cookies(self, session_url, params):
i = 15
@@ -150,24 +149,20 @@ class ClearwaterTesting():
"""
# pylint: disable=too-many-locals,too-many-arguments
self.logger.info('Run Clearwater live test')
- script = ('cd {0};'
- 'rake test[{1}] SIGNUP_CODE={2}'
- .format(self.test_dir,
- public_domain,
- signup_code))
+ script = (f'cd {self.test_dir};'
+ f'rake test[{public_domain}] SIGNUP_CODE={signup_code}')
if self.bono_ip and self.ellis_ip:
- subscript = ' PROXY={0} ELLIS={1}'.format(
- self.bono_ip, self.ellis_ip)
- script = '{0}{1}'.format(script, subscript)
- script = ('{0}{1}'.format(script, ' --trace'))
- cmd = "/bin/bash -c '{0}'".format(script)
+ subscript = f' PROXY={self.bono_ip} ELLIS={self.ellis_ip}'
+ script = f'{script}{subscript}'
+ script = f'{script} --trace'
+ cmd = f"/bin/bash -c '{script}'"
self.logger.debug('Live test cmd: %s', cmd)
output_file = os.path.join(self.result_dir, "ims_test_output.txt")
ft_utils.execute_command(cmd,
error_msg='Clearwater live test failed',
output_file=output_file)
- with open(output_file, 'r') as ofile:
+ with open(output_file, 'r', encoding='utf-8') as ofile:
result = ofile.read()
if result != "":
diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
index 4428990a8..b93af7d6d 100644
--- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py
+++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py
@@ -58,7 +58,7 @@ class CloudifyIms(cloudify.Cloudify):
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
+ config.CONF, f'vnf_{self.case_name}_config')
except Exception as exc:
raise Exception("VNF config file not found") from exc
diff --git a/functest/opnfv_tests/vnf/ims/heat_ims.py b/functest/opnfv_tests/vnf/ims/heat_ims.py
index 60478adcb..0d4e345a0 100644
--- a/functest/opnfv_tests/vnf/ims/heat_ims.py
+++ b/functest/opnfv_tests/vnf/ims/heat_ims.py
@@ -62,7 +62,7 @@ class HeatIms(singlevm.VmReady2):
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
+ config.CONF, f'vnf_{self.case_name}_config')
except Exception as exc:
raise Exception("VNF config file not found") from exc
@@ -112,9 +112,10 @@ class HeatIms(singlevm.VmReady2):
project=self.project.project.id,
domain=self.project.domain.id)
self.keypair = self.cloud.create_keypair(
- '{}-kp_{}'.format(self.case_name, self.guid))
+ f'{self.case_name}-kp_{self.guid}')
self.__logger.info("keypair:\n%s", self.keypair.private_key)
- with open(self.key_filename, 'w') as private_key_file:
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
private_key_file.write(self.keypair.private_key)
if self.deploy_vnf() and self.test_vnf():
diff --git a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
index 93779f4f8..32d675347 100644
--- a/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
+++ b/functest/opnfv_tests/vnf/router/cloudify_vrouter.py
@@ -56,7 +56,7 @@ class CloudifyVrouter(cloudify.Cloudify):
# Retrieve the configuration
try:
self.config = getattr(
- config.CONF, 'vnf_{}_config'.format(self.case_name))
+ config.CONF, f'vnf_{self.case_name}_config')
except Exception as exc:
raise Exception("VNF config file not found") from exc
diff --git a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
index 7c532d9a3..9eb3c5d69 100644
--- a/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
+++ b/functest/opnfv_tests/vnf/router/test_controller/function_test_exec.py
@@ -36,12 +36,12 @@ class FunctionTestExec():
os.path.join(
self.util.vnf_data_dir, self.util.command_template_dir,
self.util.test_cmd_map_yaml_file),
- 'r') as test_cmd_map_file:
+ 'r', encoding='utf-8') as test_cmd_map_file:
self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file)
self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
diff --git a/functest/opnfv_tests/vnf/router/utilvnf.py b/functest/opnfv_tests/vnf/router/utilvnf.py
index 7339573d7..111f20c1a 100644
--- a/functest/opnfv_tests/vnf/router/utilvnf.py
+++ b/functest/opnfv_tests/vnf/router/utilvnf.py
@@ -64,7 +64,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
if not os.path.exists(self.vnf_data_dir):
os.makedirs(self.vnf_data_dir)
- with open(self.test_env_config_yaml) as file_fd:
+ with open(self.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -98,9 +98,7 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
return mac_address
def get_blueprint_outputs(self, cfy_manager_ip, deployment_name):
- url = "http://%s/deployments/%s/outputs" % (
- cfy_manager_ip, deployment_name)
-
+ url = f"http://{cfy_manager_ip}/deployments/{deployment_name}/outputs"
response = requests.get(
url,
auth=requests.auth.HTTPBasicAuth('admin', 'admin'),
@@ -212,20 +210,28 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
def write_result_data(self, result_data):
test_result = []
if not os.path.isfile(self.test_result_json_file):
- with open(self.test_result_json_file, "w") as file_fd:
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
pass
else:
- with open(self.test_result_json_file, "r") as file_fd:
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
test_result = json.load(file_fd)
test_result.append(result_data)
- with open(self.test_result_json_file, "w") as file_fd:
+ with open(
+ self.test_result_json_file, "w",
+ encoding="utf-8") as file_fd:
json.dump(test_result, file_fd)
def output_test_result_json(self):
if os.path.isfile(self.test_result_json_file):
- with open(self.test_result_json_file, "r") as file_fd:
+ with open(
+ self.test_result_json_file, "r",
+ encoding="utf-8") as file_fd:
test_result = json.load(file_fd)
output_json_data = json.dumps(test_result,
sort_keys=True,
@@ -236,6 +242,6 @@ class Utilvnf(): # pylint: disable=too-many-instance-attributes
@staticmethod
def get_test_scenario(file_path):
- with open(file_path, 'r') as test_scenario_file:
+ with open(file_path, "r", encoding="utf-8") as test_scenario_file:
test_scenario_yaml = yaml.safe_load(test_scenario_file)
return test_scenario_yaml["test_scenario_list"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
index 0969eab3b..269f6526b 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/ssh_client.py
@@ -43,7 +43,7 @@ class SshClient(): # pylint: disable=too-many-instance-attributes
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.util = Utilvnf()
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
index db276cfdb..2210b3909 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vm_controller.py
@@ -36,7 +36,7 @@ class VmController():
self.util = Utilvnf()
self.util.set_credentials(credentials["cloud"])
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -101,7 +101,7 @@ class VmController():
def command_create_and_execute(self, ssh, test_cmd_file_path,
cmd_input_param, prompt_file_path):
- with open(prompt_file_path, 'r') as prompt_file:
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
prompt = yaml.safe_load(prompt_file)
config_mode_prompt = prompt["config_mode"]
diff --git a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
index 4ab394760..46584456f 100644
--- a/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
+++ b/functest/opnfv_tests/vnf/router/vnf_controller/vnf_controller.py
@@ -36,7 +36,7 @@ class VnfController():
self.util = Utilvnf()
self.vm_controller = VmController(util_info)
- with open(self.util.test_env_config_yaml) as file_fd:
+ with open(self.util.test_env_config_yaml, encoding='utf-8') as file_fd:
test_env_config_yaml = yaml.safe_load(file_fd)
file_fd.close()
@@ -49,7 +49,8 @@ class VnfController():
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path,
parameter_file_path, prompt_file_path):
# pylint: disable=too-many-arguments
- with open(parameter_file_path, 'r') as parameter_file:
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"]
@@ -69,14 +70,15 @@ class VnfController():
res_dict_data_list = []
- with open(parameter_file_path, 'r') as parameter_file:
+ with open(
+ parameter_file_path, 'r', encoding='utf-8') as parameter_file:
cmd_input_param = yaml.safe_load(parameter_file)
cmd_input_param["source_ip"] = target_vnf["data_plane_network_ip"]
cmd_input_param["destination_ip"] = reference_vnf[
"data_plane_network_ip"]
- with open(prompt_file_path, 'r') as prompt_file:
+ with open(prompt_file_path, 'r', encoding='utf-8') as prompt_file:
prompt = yaml.safe_load(prompt_file)
terminal_mode_prompt = prompt["terminal_mode"]
diff --git a/functest/tests/unit/odl/test_odl.py b/functest/tests/unit/odl/test_odl.py
index 24ac04591..c675c2988 100644
--- a/functest/tests/unit/odl/test_odl.py
+++ b/functest/tests/unit/odl/test_odl.py
@@ -33,10 +33,10 @@ class ODLTesting(unittest.TestCase):
logging.disable(logging.CRITICAL)
_keystone_ip = "127.0.0.1"
- _neutron_url = u"https://127.0.0.1:9696"
- _neutron_id = u"dummy"
+ _neutron_url = "https://127.0.0.1:9696"
+ _neutron_id = "dummy"
_sdn_controller_ip = "127.0.0.3"
- _os_auth_url = "http://{}:5000/v3".format(_keystone_ip)
+ _os_auth_url = f"http://{_keystone_ip}:5000/v3"
_os_projectname = "admin"
_os_username = "admin"
_os_password = "admin"
@@ -63,8 +63,7 @@ class ODLTesting(unittest.TestCase):
self.test = odl.ODLTests(case_name='odl', project_name='functest')
self.defaultargs = {'odlusername': self._odl_username,
'odlpassword': self._odl_password,
- 'neutronurl': "http://{}:9696".format(
- self._keystone_ip),
+ 'neutronurl': f"http://{self._keystone_ip}:9696",
'osauthurl': self._os_auth_url,
'osusername': self._os_username,
'osuserdomainname': self._os_userdomainname,
@@ -105,7 +104,7 @@ class ODLRobotTesting(ODLTesting):
mock_method.assert_called_once_with(
os.path.join(odl.ODLTests.odl_test_repo,
'csit/variables/Variables.robot'), inplace=True)
- self.assertEqual(args[0].getvalue(), "{}\n".format(msg2))
+ self.assertEqual(args[0].getvalue(), f"{msg2}\n")
def test_set_vars_auth_default(self):
self._test_set_vars(
@@ -160,19 +159,19 @@ class ODLMainTesting(ODLTesting):
args[0].assert_called_once_with(self.test.odl_variables_file)
if len(args) > 1:
variable = [
- 'KEYSTONEURL:{}://{}'.format(
- urllib.parse.urlparse(self._os_auth_url).scheme,
- urllib.parse.urlparse(self._os_auth_url).netloc),
- 'NEUTRONURL:{}'.format(self._neutron_url),
- 'OS_AUTH_URL:"{}"'.format(self._os_auth_url),
- 'OSUSERNAME:"{}"'.format(self._os_username),
- 'OSUSERDOMAINNAME:"{}"'.format(self._os_userdomainname),
- 'OSTENANTNAME:"{}"'.format(self._os_projectname),
- 'OSPROJECTDOMAINNAME:"{}"'.format(self._os_projectdomainname),
- 'OSPASSWORD:"{}"'.format(self._os_password),
- 'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip),
- 'PORT:{}'.format(self._odl_webport),
- 'RESTCONFPORT:{}'.format(self._odl_restconfport)]
+ ('KEYSTONEURL:'
+ f'{urllib.parse.urlparse(self._os_auth_url).scheme}://'
+ f'{urllib.parse.urlparse(self._os_auth_url).netloc}'),
+ f'NEUTRONURL:{self._neutron_url}',
+ f'OS_AUTH_URL:"{self._os_auth_url}"',
+ f'OSUSERNAME:"{self._os_username}"',
+ f'OSUSERDOMAINNAME:"{self._os_userdomainname}"',
+ f'OSTENANTNAME:"{self._os_projectname}"',
+ f'OSPROJECTDOMAINNAME:"{self._os_projectdomainname}"',
+ f'OSPASSWORD:"{self._os_password}"',
+ f'ODL_SYSTEM_IP:{self._sdn_controller_ip}',
+ f'PORT:{self._odl_webport}',
+ f'RESTCONFPORT:{self._odl_restconfport}']
args[1].assert_called_once_with(
odl.ODLTests.basic_suite_dir, odl.ODLTests.neutron_suite_dir,
include=[],
@@ -551,8 +550,8 @@ class ODLArgParserTesting(ODLTesting):
self.defaultargs['odlip'] = self._sdn_controller_ip
self.assertEqual(
self.parser.parse_args(
- ["--neutronurl={}".format(self._neutron_url),
- "--odlip={}".format(self._sdn_controller_ip)]),
+ [f"--neutronurl={self._neutron_url}",
+ f"--odlip={self._sdn_controller_ip}"]),
self.defaultargs)
@mock.patch('sys.stderr', new_callable=six.StringIO)
@@ -565,7 +564,7 @@ class ODLArgParserTesting(ODLTesting):
def _test_arg(self, arg, value):
self.defaultargs[arg] = value
self.assertEqual(
- self.parser.parse_args(["--{}={}".format(arg, value)]),
+ self.parser.parse_args([f"--{arg}={value}"]),
self.defaultargs)
def test_odlusername(self):
@@ -606,7 +605,7 @@ class ODLArgParserTesting(ODLTesting):
def test_pushtodb(self):
self.defaultargs['pushtodb'] = True
- self.assertEqual(self.parser.parse_args(["--{}".format('pushtodb')]),
+ self.assertEqual(self.parser.parse_args(["--pushtodb"]),
self.defaultargs)
def test_multiple_args(self):
@@ -614,8 +613,8 @@ class ODLArgParserTesting(ODLTesting):
self.defaultargs['odlip'] = self._sdn_controller_ip
self.assertEqual(
self.parser.parse_args(
- ["--neutronurl={}".format(self._neutron_url),
- "--odlip={}".format(self._sdn_controller_ip)]),
+ [f"--neutronurl={self._neutron_url}",
+ f"--odlip={self._sdn_controller_ip}"]),
self.defaultargs)
diff --git a/functest/tests/unit/openstack/cinder/test_cinder.py b/functest/tests/unit/openstack/cinder/test_cinder.py
index 4052408d9..d3c9cabb6 100644
--- a/functest/tests/unit/openstack/cinder/test_cinder.py
+++ b/functest/tests/unit/openstack/cinder/test_cinder.py
@@ -59,7 +59,7 @@ class CinderTesting(unittest.TestCase):
self.cinder.prepare()
args[0].assert_called_with()
args[1].assert_called_once_with(
- '{}-vm2_{}'.format(self.cinder.case_name, self.cinder.guid),
+ f'{self.cinder.case_name}-vm2_{self.cinder.guid}',
security_groups=[self.cinder.sec.id],
key_name=self.cinder.keypair.id)
self.cinder.cloud.create_volume.assert_not_called()
@@ -81,13 +81,12 @@ class CinderTesting(unittest.TestCase):
self.cinder.prepare()
args[0].assert_called_once_with()
args[1].assert_called_once_with(
- '{}-vm2_{}'.format(self.cinder.case_name, self.cinder.guid),
+ f'{self.cinder.case_name}-vm2_{self.cinder.guid}',
security_groups=[self.cinder.sec.id],
key_name=self.cinder.keypair.id)
self.cinder.connect.assert_called_once_with(args[1].return_value)
self.cinder.cloud.create_volume.assert_called_once_with(
- name='{}-volume_{}'.format(
- self.cinder.case_name, self.cinder.guid),
+ name=f'{self.cinder.case_name}-volume_{self.cinder.guid}',
size='2', timeout=self.cinder.volume_timeout, wait=True)
@mock.patch('scp.SCPClient.put')
@@ -101,7 +100,7 @@ class CinderTesting(unittest.TestCase):
self.cinder.ssh.exec_command.return_value = (None, stdout, mock.Mock())
self.assertEqual(self.cinder._write_data(), 0)
self.cinder.ssh.exec_command.assert_called_once_with(
- "sh ~/write_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+ f"sh ~/write_data.sh {env.get('VOLUME_DEVICE_NAME')}")
self.cinder.cloud.attach_volume.assert_called_once_with(
self.cinder.sshvm, self.cinder.volume,
timeout=self.cinder.volume_timeout)
@@ -138,7 +137,7 @@ class CinderTesting(unittest.TestCase):
stdout.channel.recv_exit_status.return_value = 0
self.assertEqual(self.cinder._read_data(), 0)
self.cinder.ssh2.exec_command.assert_called_once_with(
- "sh ~/read_data.sh {}".format(env.get('VOLUME_DEVICE_NAME')))
+ f"sh ~/read_data.sh {env.get('VOLUME_DEVICE_NAME')}")
self.cinder.cloud.attach_volume.assert_called_once_with(
self.cinder.vm2, self.cinder.volume,
timeout=self.cinder.volume_timeout)
diff --git a/functest/tests/unit/openstack/rally/test_rally.py b/functest/tests/unit/openstack/rally/test_rally.py
index c281d4f52..f3c2e7cf6 100644
--- a/functest/tests/unit/openstack/rally/test_rally.py
+++ b/functest/tests/unit/openstack/rally/test_rally.py
@@ -50,7 +50,7 @@ class OSRallyTesting(unittest.TestCase):
@staticmethod
def check_scenario_file(value):
- yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+ yaml_file = 'opnfv-test_file_name.yaml'
if yaml_file in value:
return False
return True
@@ -64,7 +64,7 @@ class OSRallyTesting(unittest.TestCase):
@staticmethod
def check_temp_dir(value):
- yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+ yaml_file = 'opnfv-test_file_name.yaml'
if yaml_file in value:
return True
return False
@@ -95,7 +95,7 @@ class OSRallyTesting(unittest.TestCase):
self, mock_method, mock_os_makedirs, mock_path_exists):
mock_path_exists.side_effect = self.check_temp_dir
- yaml_file = 'opnfv-{}.yaml'.format('test_file_name')
+ yaml_file = 'opnfv-test_file_name.yaml'
ret_val = os.path.join(self.rally_base.temp_dir, yaml_file)
self.assertEqual(self.rally_base._prepare_test_list('test_file_name'),
ret_val)
@@ -423,8 +423,8 @@ class OSRallyTesting(unittest.TestCase):
@mock.patch('subprocess.check_output',
side_effect=subprocess.CalledProcessError('', ''))
def test_export_task_ko(self, *args):
- file_name = "{}/{}.html".format(
- self.rally_base.results_dir, self.rally_base.case_name)
+ file_name = (f"{self.rally_base.results_dir}/"
+ f"{self.rally_base.case_name}.html")
with self.assertRaises(subprocess.CalledProcessError):
self.rally_base.export_task(file_name)
cmd = ["rally", "task", "export", "--type", "html", "--deployment",
@@ -434,8 +434,8 @@ class OSRallyTesting(unittest.TestCase):
@mock.patch('subprocess.check_output', return_value=b'')
def test_export_task(self, *args):
- file_name = "{}/{}.html".format(
- self.rally_base.results_dir, self.rally_base.case_name)
+ file_name = (f"{self.rally_base.results_dir}/"
+ f"{self.rally_base.case_name}.html")
self.assertEqual(self.rally_base.export_task(file_name), None)
cmd = ["rally", "task", "export", "--type", "html", "--deployment",
str(getattr(config.CONF, 'rally_deployment_name')),
@@ -445,8 +445,8 @@ class OSRallyTesting(unittest.TestCase):
@mock.patch('subprocess.check_output',
side_effect=subprocess.CalledProcessError('', ''))
def test_verify_report_ko(self, *args):
- file_name = "{}/{}.html".format(
- self.rally_base.results_dir, self.rally_base.case_name)
+ file_name = (f"{self.rally_base.results_dir}/"
+ f"{self.rally_base.case_name}.html")
with self.assertRaises(subprocess.CalledProcessError):
self.rally_base.verify_report(file_name, "1")
cmd = ["rally", "verify", "report", "--type", "html", "--uuid", "1",
@@ -455,8 +455,8 @@ class OSRallyTesting(unittest.TestCase):
@mock.patch('subprocess.check_output', return_value=b'')
def test_verify_report(self, *args):
- file_name = "{}/{}.html".format(
- self.rally_base.results_dir, self.rally_base.case_name)
+ file_name = (f"{self.rally_base.results_dir}/"
+ f"{self.rally_base.case_name}.html")
self.assertEqual(self.rally_base.verify_report(file_name, "1"), None)
cmd = ["rally", "verify", "report", "--type", "html", "--uuid", "1",
"--to", file_name]
diff --git a/functest/tests/unit/openstack/tempest/test_tempest.py b/functest/tests/unit/openstack/tempest/test_tempest.py
index e27b52ef4..efc4393c8 100644
--- a/functest/tests/unit/openstack/tempest/test_tempest.py
+++ b/functest/tests/unit/openstack/tempest/test_tempest.py
@@ -83,8 +83,8 @@ class OSTempestTesting(unittest.TestCase):
testr_mode = self.tempestcommon.mode
verifier_repo_dir = 'test_verifier_repo_dir'
self.tempestcommon.verifier_repo_dir = verifier_repo_dir
- cmd = "(cd {0}; stestr list '{1}' >{2} 2>/dev/null)".format(
- verifier_repo_dir, testr_mode, self.tempestcommon.list)
+ cmd = (f"(cd {verifier_repo_dir}; stestr list '{testr_mode}' > "
+ f"{self.tempestcommon.list} 2>/dev/null)")
self.tempestcommon.generate_test_list(mode=testr_mode)
args[0].assert_called_once_with(cmd, shell=True)
args[2].assert_called_once_with('/etc/tempest.conf')
diff --git a/functest/tests/unit/openstack/vmtp/test_vmtp.py b/functest/tests/unit/openstack/vmtp/test_vmtp.py
index c1ad30afd..850273476 100644
--- a/functest/tests/unit/openstack/vmtp/test_vmtp.py
+++ b/functest/tests/unit/openstack/vmtp/test_vmtp.py
@@ -66,10 +66,12 @@ class VmtpTesting(unittest.TestCase):
def test_generate_keys1(self, *args):
self.testcase.generate_keys()
self.testcase.cloud.create_keypair.assert_called_once_with(
- 'vmtp_{}'.format(self.testcase.guid))
+ f'vmtp_{self.testcase.guid}')
self.testcase.cloud.delete_keypair.assert_called_once_with('id')
- calls = [mock.call(self.testcase.privkey_filename, 'w'),
- mock.call(self.testcase.pubkey_filename, 'w')]
+ calls = [mock.call(
+ self.testcase.privkey_filename, 'w', encoding='utf-8'),
+ mock.call(
+ self.testcase.pubkey_filename, 'w', encoding='utf-8')]
args[0].assert_has_calls(calls, any_order=True)
@mock.patch('six.moves.builtins.open')
@@ -79,7 +81,7 @@ class VmtpTesting(unittest.TestCase):
side_effect=shade.OpenStackCloudException(None)) as mock_obj, \
self.assertRaises(shade.OpenStackCloudException):
self.testcase.generate_keys()
- mock_obj.assert_called_once_with('vmtp_{}'.format(self.testcase.guid))
+ mock_obj.assert_called_once_with(f'vmtp_{self.testcase.guid}')
args[0].assert_not_called()
diff --git a/functest/tests/unit/openstack/vping/test_vping_ssh.py b/functest/tests/unit/openstack/vping/test_vping_ssh.py
index bc1148da4..a07148aab 100644
--- a/functest/tests/unit/openstack/vping/test_vping_ssh.py
+++ b/functest/tests/unit/openstack/vping/test_vping_ssh.py
@@ -47,7 +47,7 @@ class VpingSSHTesting(unittest.TestCase):
self.vping.prepare()
args[0].assert_called_once_with()
args[1].assert_called_once_with(
- '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
+ f'{self.vping.case_name}-vm2_{self.vping.guid}',
security_groups=[self.vping.sec.id])
@mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
@@ -58,7 +58,7 @@ class VpingSSHTesting(unittest.TestCase):
self.vping.prepare()
args[0].assert_called_once_with()
args[1].assert_called_once_with(
- '{}-vm2_{}'.format(self.vping.case_name, self.vping.guid),
+ f'{self.vping.case_name}-vm2_{self.vping.guid}',
security_groups=[self.vping.sec.id])
@mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
@@ -70,7 +70,7 @@ class VpingSSHTesting(unittest.TestCase):
with self.assertRaises(ssh_exception.SSHException):
self.vping.execute()
self.vping.ssh.exec_command.assert_called_once_with(
- 'ping -c 1 {}'.format(self.vping.vm2.private_v4))
+ f'ping -c 1 {self.vping.vm2.private_v4}')
args[0].assert_called_once_with('foo')
@mock.patch('functest.opnfv_tests.openstack.vping.vping_ssh.VPingSSH.'
@@ -94,7 +94,7 @@ class VpingSSHTesting(unittest.TestCase):
self.assertEqual(self.vping.execute(), ret)
mock_check.assert_called_once_with('foo')
self.vping.ssh.exec_command.assert_called_once_with(
- 'ping -c 1 {}'.format(self.vping.vm2.private_v4))
+ f'ping -c 1 {self.vping.vm2.private_v4}')
def test_execute1(self):
self._test_execute()
diff --git a/functest/tests/unit/utils/test_functest_utils.py b/functest/tests/unit/utils/test_functest_utils.py
index 1fab8f175..07a57a49a 100644
--- a/functest/tests/unit/utils/test_functest_utils.py
+++ b/functest/tests/unit/utils/test_functest_utils.py
@@ -108,9 +108,10 @@ class FunctestUtilsTesting(unittest.TestCase):
self.cmd, info=True, error_msg=self.error_msg, verbose=True,
output_file=self.output_file)
self.assertEqual(resp, 1)
- msg_exec = ("Executing command: '%s'" % self.cmd)
+ msg_exec = f"Executing command: '{self.cmd}'"
mock_logger_info.assert_called_once_with(msg_exec)
- mopen.assert_called_once_with(self.output_file, "w")
+ mopen.assert_called_once_with(
+ self.output_file, "w", encoding='utf-8')
mock_logger_error.assert_called_once_with(self.error_msg)
@mock.patch('functest.utils.functest_utils.LOGGER.info')
@@ -129,9 +130,10 @@ class FunctestUtilsTesting(unittest.TestCase):
self.cmd, info=True, error_msg=self.error_msg, verbose=True,
output_file=self.output_file)
self.assertEqual(resp, 0)
- msg_exec = ("Executing command: '%s'" % self.cmd)
+ msg_exec = (f"Executing command: '{self.cmd}'")
mock_logger_info.assert_called_once_with(msg_exec)
- mopen.assert_called_once_with(self.output_file, "w")
+ mopen.assert_called_once_with(
+ self.output_file, "w", encoding='utf-8')
@mock.patch('sys.stdout')
def test_exec_cmd_args_missing_ok(self, stdout=None):
@@ -175,9 +177,9 @@ class FunctestUtilsTesting(unittest.TestCase):
mock_yaml.return_value = self.file_yaml
functest_utils.get_parameter_from_yaml(self.parameter,
self.test_file)
- self.assertTrue(("The parameter %s is not"
- " defined in config_functest.yaml" %
- self.parameter) in excep.exception)
+ self.assertTrue((f"The parameter {self.parameter} is not"
+ " defined in config_functest.yaml"
+ ) in excep.exception)
def test_get_param_from_yaml_def(self):
with mock.patch('six.moves.builtins.open', mock.mock_open()), \
diff --git a/functest/utils/config.py b/functest/utils/config.py
index 3226b2d1f..40414b88b 100644
--- a/functest/utils/config.py
+++ b/functest/utils/config.py
@@ -14,11 +14,12 @@ class Config():
def __init__(self):
try:
with open(pkg_resources.resource_filename(
- 'functest', 'ci/config_functest.yaml')) as yfile:
+ 'functest', 'ci/config_functest.yaml'),
+ encoding='utf-8') as yfile:
self.functest_yaml = yaml.safe_load(yfile)
except Exception as error:
raise Exception(
- 'Parse config failed: {}'.format(str(error))) from error
+ f'Parse config failed: {str(error)}') from error
@staticmethod
def _merge_dicts(dict1, dict2):
@@ -34,7 +35,7 @@ class Config():
yield (k, dict2[k])
def patch_file(self, patch_file_path):
- with open(patch_file_path) as yfile:
+ with open(patch_file_path, encoding='utf-8') as yfile:
patch_file = yaml.safe_load(yfile)
for key in patch_file:
@@ -53,14 +54,14 @@ class Config():
@staticmethod
def _get_attr_further(attr_now, next): # pylint: disable=redefined-builtin
return attr_now if next == 'general' else (
- '{}_{}'.format(attr_now, next) if attr_now else next)
+ f'{attr_now}_{next}' if attr_now else next)
def fill(self):
try:
self._parse(None, self.functest_yaml)
except Exception as error:
raise Exception(
- 'Parse config failed: {}'.format(str(error))) from error
+ f'Parse config failed: {str(error)}') from error
CONF = Config()
diff --git a/functest/utils/functest_utils.py b/functest/utils/functest_utils.py
index 31e453504..4078fb121 100644
--- a/functest/utils/functest_utils.py
+++ b/functest/utils/functest_utils.py
@@ -32,8 +32,8 @@ def execute_command_raise(cmd, info=False, error_msg="",
def execute_command(cmd, info=False, error_msg="",
verbose=True, output_file=None):
if not error_msg:
- error_msg = ("The command '%s' failed." % cmd)
- msg_exec = ("Executing command: '%s'" % cmd)
+ error_msg = f"The command '{cmd}' failed."
+ msg_exec = f"Executing command: '{cmd}'"
if verbose:
if info:
LOGGER.info(msg_exec)
@@ -43,7 +43,7 @@ def execute_command(cmd, info=False, error_msg="",
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) as popen:
if output_file:
- with open(output_file, "w") as ofd:
+ with open(output_file, "w", encoding='utf-8') as ofd:
for line in iter(popen.stdout.readline, b''):
if output_file:
ofd.write(line.decode("utf-8"))
@@ -65,14 +65,14 @@ def get_parameter_from_yaml(parameter, yfile):
parameter must be given in string format with dots
Example: general.openstack.image_name
"""
- with open(yfile) as yfd:
+ with open(yfile, encoding='utf-8') as yfd:
file_yaml = yaml.safe_load(yfd)
value = file_yaml
for element in parameter.split("."):
value = value.get(element)
if value is None:
- raise ValueError("The parameter %s is not defined in"
- " %s" % (parameter, yfile))
+ raise ValueError(f"The parameter {parameter} is not defined in"
+ f" {yfile}")
return value
@@ -195,14 +195,13 @@ def search_services(cloud, name_or_id=None, filters=None):
def convert_dict_to_ini(value):
"Convert dict to oslo.conf input"
assert isinstance(value, dict)
- return ",".join("{}:{}".format(
- key, val) for (key, val) in six.iteritems(value))
+ return ",".join(f"{key}:{val}" for (key, val) in six.iteritems(value))
def convert_list_to_ini(value):
"Convert list to oslo.conf input"
assert isinstance(value, list)
- return ",".join("{}".format(val) for val in value)
+ return ",".join(val for val in value)
def convert_ini_to_dict(value):
diff --git a/test-requirements.txt b/test-requirements.txt
index 1760ff427..e3cf977e8 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -9,7 +9,6 @@ pylint # GPLv2
sphinx!=1.6.6,!=1.6.7,!=2.1.0,!=3.0.0,!=3.4.2 # BSD
sphinx-rtd-theme
yamllint
-ansible-lint
doc8 # Apache-2.0
bashate # Apache-2.0
bandit
diff --git a/tox.ini b/tox.ini
index c52fad15d..2c643f39f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = docs,pep8,pylint,yamllint,ansiblelint,bashate,bandit,py39,cover,perm
+envlist = docs,pep8,pylint,yamllint,bashate,bandit,py39,cover,perm
[testenv]
pip_version = pip==20.2.4
@@ -57,11 +57,6 @@ files =
commands =
yamllint -s {[testenv:yamllint]files}
-[testenv:ansiblelint]
-basepython = python3.9
-commands =
- ansible-lint -x303 ansible/site.yml
-
[testenv:py37]
commands = nosetests functest/tests/unit
diff --git a/upper-constraints.txt b/upper-constraints.txt
index 9ac9f2a28..6c5bdbbcf 100644
--- a/upper-constraints.txt
+++ b/upper-constraints.txt
@@ -1,22 +1,21 @@
git+https://gerrit.opnfv.org/gerrit/functest#egg=functest
git+https://github.com/collivier/cloudify-rest-client.git@4.3.3-py3#egg=cloudify-rest-client
-robotframework===3.1.1
+robotframework===4.1.2
robotframework-httplibrary===0.4.2
-robotframework-requests===0.5.0
-robotframework-sshlibrary===3.3.0
-ansible===2.9.2
+robotframework-requests===0.9.2
+robotframework-sshlibrary===4.1.2
xtesting===0.93.0
git+https://github.com/PyCQA/bandit@3d0824676974e7e2e9635c10bc4f12e261f1dbdf#egg=bandit
-bandit===1.1.0
+bandit===1.7.0
ruamel.yaml.jinja2==0.2.2
-e git+https://opendev.org/openstack/tempest#egg=tempest
-e git+https://opendev.org/openstack/rally.git#egg=rally
git+https://opendev.org/openstack/rally-openstack.git#egg=rally-openstack
git+https://github.com/xrally/xrally-kubernetes.git#egg=xrally-kubernetes
-pylint===2.9.6
-flake8===3.7.9
+pylint===2.11.1
+flake8===4.0.1
nose===1.3.7
-ruamel.yaml===0.15.100
+ruamel.yaml===0.17.17
sphinxcontrib-spelling===4.3.0
-ansible-lint===4.2.0
-setuptools_scm===6.0.1
+ansible-lint===5.2.1
+setuptools_scm===6.3.2