aboutsummaryrefslogtreecommitdiffstats
path: root/functest/core
diff options
context:
space:
mode:
Diffstat (limited to 'functest/core')
-rw-r--r--functest/core/cloudify.py32
-rw-r--r--functest/core/singlevm.py99
-rw-r--r--functest/core/tenantnetwork.py53
3 files changed, 91 insertions, 93 deletions
diff --git a/functest/core/cloudify.py b/functest/core/cloudify.py
index 0fb4f6eca..966d33645 100644
--- a/functest/core/cloudify.py
+++ b/functest/core/cloudify.py
@@ -29,7 +29,7 @@ class Cloudify(singlevm.SingleVm2):
__logger = logging.getLogger(__name__)
filename = ('/home/opnfv/functest/images/'
- 'ubuntu-16.04-server-cloudimg-amd64-disk1.img')
+ 'ubuntu-18.04-server-cloudimg-amd64.img')
flavor_ram = 4096
flavor_vcpus = 2
flavor_disk = 40
@@ -46,11 +46,11 @@ class Cloudify(singlevm.SingleVm2):
"""Initialize Cloudify testcase object."""
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify"
- super(Cloudify, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.cfy_client = None
def prepare(self):
- super(Cloudify, self).prepare()
+ super().prepare()
for port in self.ports:
self.cloud.create_security_group_rule(
self.sec.id, port_range_min=port, port_range_max=port,
@@ -64,20 +64,19 @@ class Cloudify(singlevm.SingleVm2):
scpc.put(self.cloudify_archive,
remote_path=os.path.basename(self.cloudify_archive))
(_, stdout, stderr) = self.ssh.exec_command(
- "sudo wget https://get.docker.com/ -O script.sh && "
- "sudo chmod +x script.sh && "
- "sudo ./script.sh && "
- "sudo docker load -i ~/{} && "
+ "sudo apt-get update && "
+ "sudo apt-get install -y docker.io && "
+ "sudo docker load -i "
+ f"~/{os.path.basename(self.cloudify_archive)} && "
"sudo docker run --name cfy_manager_local -d "
"--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro "
"--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined "
- "--cap-add SYS_ADMIN --network=host {}".format(
- os.path.basename(self.cloudify_archive),
- self.cloudify_container))
+ f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}")
self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
self.cfy_client = CloudifyClient(
- host=self.fip.floating_ip_address,
+ host=self.fip.floating_ip_address if self.fip else (
+ self.sshvm.public_v4),
username='admin', password='admin', tenant='default_tenant')
self.__logger.info("Attemps running status of the Manager")
secret_key = "foo"
@@ -131,8 +130,8 @@ class Cloudify(singlevm.SingleVm2):
"""Upload Cloudify plugins"""
(_, stdout, stderr) = self.ssh.exec_command(
"sudo docker exec cfy_manager_local "
- "cfy plugins upload -y {} {} && "
- "sudo docker exec cfy_manager_local cfy status".format(yaml, wgn))
+ f"cfy plugins upload -y {yaml} {wgn} && "
+ "sudo docker exec cfy_manager_local cfy status")
self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
@@ -188,9 +187,8 @@ def wait_for_execution(client, execution, logger, timeout=3600, ):
if timeout is not None:
if time.time() > deadline:
raise RuntimeError(
- 'execution of operation {0} for deployment {1} '
- 'timed out'.format(execution.workflow_id,
- execution.deployment_id))
+ 'execution of operation {execution.workflow_id} for '
+ 'deployment {execution.deployment_id} timed out')
# update the remaining timeout
timeout = deadline - time.time()
@@ -218,4 +216,4 @@ def get_execution_id(client, deployment_id):
return execution
raise RuntimeError('Failed to get create_deployment_environment '
'workflow execution.'
- 'Available executions: {0}'.format(executions))
+ f'Available executions: {executions}')
diff --git a/functest/core/singlevm.py b/functest/core/singlevm.py
index bfaa53bfc..4bce516d3 100644
--- a/functest/core/singlevm.py
+++ b/functest/core/singlevm.py
@@ -39,7 +39,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
# pylint: disable=too-many-instance-attributes
__logger = logging.getLogger(__name__)
- filename = '/home/opnfv/functest/images/cirros-0.4.0-x86_64-disk.img'
+ filename = '/home/opnfv/functest/images/cirros-0.6.1-x86_64-disk.img'
image_format = 'qcow2'
extra_properties = {}
filename_alt = filename
@@ -59,7 +59,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmready1'
- super(VmReady1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.image = None
self.flavor = None
@@ -80,19 +80,18 @@ class VmReady1(tenantnetwork.TenantNetwork1):
functest_utils.convert_ini_to_dict(
env.get('IMAGE_PROPERTIES')))
extra_properties.update(
- getattr(config.CONF, '{}_extra_properties'.format(
- self.case_name), {}))
+ getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
image = self.cloud.create_image(
- name if name else '{}-img_{}'.format(self.case_name, self.guid),
+ name if name else f'{self.case_name}-img_{self.guid}',
filename=getattr(
- config.CONF, '{}_image'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image',
self.filename),
meta=extra_properties,
disk_format=getattr(
- config.CONF, '{}_image_format'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_format',
self.image_format),
visibility=getattr(
- config.CONF, '{}_visibility'.format(self.case_name),
+ config.CONF, f'{self.case_name}_visibility',
self.visibility),
wait=True)
self.__logger.debug("image: %s", image)
@@ -115,20 +114,18 @@ class VmReady1(tenantnetwork.TenantNetwork1):
functest_utils.convert_ini_to_dict(
env.get('IMAGE_PROPERTIES')))
extra_alt_properties.update(
- getattr(config.CONF, '{}_extra_alt_properties'.format(
- self.case_name), {}))
+ getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {}))
image = self.cloud.create_image(
- name if name else '{}-img_alt_{}'.format(
- self.case_name, self.guid),
+ name if name else f'{self.case_name}-img_alt_{self.guid}',
filename=getattr(
- config.CONF, '{}_image_alt'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_alt',
self.filename_alt),
meta=extra_alt_properties,
disk_format=getattr(
- config.CONF, '{}_image_alt_format'.format(self.case_name),
+ config.CONF, f'{self.case_name}_image_alt_format',
self.image_format),
visibility=getattr(
- config.CONF, '{}_visibility'.format(self.case_name),
+ config.CONF, f'{self.case_name}_visibility',
self.visibility),
wait=True)
self.__logger.debug("image: %s", image)
@@ -146,12 +143,12 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.orig_cloud
flavor = self.orig_cloud.create_flavor(
- name if name else '{}-flavor_{}'.format(self.case_name, self.guid),
- getattr(config.CONF, '{}_flavor_ram'.format(self.case_name),
+ name if name else f'{self.case_name}-flavor_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_ram',
self.flavor_ram),
- getattr(config.CONF, '{}_flavor_vcpus'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_vcpus',
self.flavor_vcpus),
- getattr(config.CONF, '{}_flavor_disk'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_disk',
self.flavor_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_extra_specs = self.flavor_extra_specs.copy()
@@ -161,7 +158,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
env.get('FLAVOR_EXTRA_SPECS')))
flavor_extra_specs.update(
getattr(config.CONF,
- '{}_flavor_extra_specs'.format(self.case_name), {}))
+ f'{self.case_name}_flavor_extra_specs', {}))
self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs)
return flavor
@@ -177,13 +174,12 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.orig_cloud
flavor = self.orig_cloud.create_flavor(
- name if name else '{}-flavor_alt_{}'.format(
- self.case_name, self.guid),
- getattr(config.CONF, '{}_flavor_alt_ram'.format(self.case_name),
+ name if name else f'{self.case_name}-flavor_alt_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_ram',
self.flavor_alt_ram),
- getattr(config.CONF, '{}_flavor_alt_vcpus'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus',
self.flavor_alt_vcpus),
- getattr(config.CONF, '{}_flavor_alt_disk'.format(self.case_name),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_disk',
self.flavor_alt_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
@@ -193,7 +189,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
env.get('FLAVOR_EXTRA_SPECS')))
flavor_alt_extra_specs.update(
getattr(config.CONF,
- '{}_flavor_alt_extra_specs'.format(self.case_name), {}))
+ f'{self.case_name}_flavor_alt_extra_specs', {}))
self.orig_cloud.set_flavor_specs(
flavor.id, flavor_alt_extra_specs)
return flavor
@@ -210,9 +206,11 @@ class VmReady1(tenantnetwork.TenantNetwork1):
"""
assert self.cloud
vm1 = self.cloud.create_server(
- name if name else '{}-vm_{}'.format(self.case_name, self.guid),
+ name if name else f'{self.case_name}-vm_{self.guid}',
image=self.image.id, flavor=self.flavor.id,
- auto_ip=False, network=self.network.id,
+ auto_ip=False,
+ network=self.network.id if self.network else env.get(
+ "EXTERNAL_NETWORK"),
timeout=self.create_server_timeout, wait=True, **kwargs)
self.__logger.debug("vm: %s", vm1)
return vm1
@@ -287,7 +285,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(VmReady1, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.image = self.publish_image()
self.flavor = self.create_flavor()
@@ -304,7 +302,7 @@ class VmReady1(tenantnetwork.TenantNetwork1):
try:
assert self.orig_cloud
assert self.cloud
- super(VmReady1, self).clean()
+ super().clean()
if self.image:
self.cloud.delete_image(self.image.id)
if self.flavor:
@@ -330,7 +328,7 @@ class VmReady2(VmReady1):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmready2'
- super(VmReady2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.orig_cloud
self.project = tenantnetwork.NewProject(
@@ -344,7 +342,7 @@ class VmReady2(VmReady1):
def clean(self):
try:
- super(VmReady2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
@@ -373,7 +371,7 @@ class SingleVm1(VmReady1):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'singlevm1'
- super(SingleVm1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.sshvm = None
self.sec = None
self.fip = None
@@ -391,14 +389,15 @@ class SingleVm1(VmReady1):
"""
assert self.cloud
self.keypair = self.cloud.create_keypair(
- '{}-kp_{}'.format(self.case_name, self.guid))
+ f'{self.case_name}-kp_{self.guid}')
self.__logger.debug("keypair: %s", self.keypair)
self.__logger.debug("private_key:\n%s", self.keypair.private_key)
- with open(self.key_filename, 'w') as private_key_file:
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
private_key_file.write(self.keypair.private_key)
self.sec = self.cloud.create_security_group(
- '{}-sg_{}'.format(self.case_name, self.guid),
- 'created by OPNFV Functest ({})'.format(self.case_name))
+ f'{self.case_name}-sg_{self.guid}',
+ f'created by OPNFV Functest ({self.case_name})')
self.cloud.create_security_group_rule(
self.sec.id, port_range_min='22', port_range_max='22',
protocol='tcp', direction='ingress')
@@ -416,10 +415,12 @@ class SingleVm1(VmReady1):
- None on error
"""
assert vm1
- fip = self.cloud.create_floating_ip(
- network=self.ext_net.id, server=vm1, wait=True,
- timeout=self.create_floating_ip_timeout)
- self.__logger.debug("floating_ip: %s", fip)
+ fip = None
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ fip = self.cloud.create_floating_ip(
+ network=self.ext_net.id, server=vm1, wait=True,
+ timeout=self.create_floating_ip_timeout)
+ self.__logger.debug("floating_ip: %s", fip)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
for loop in range(self.ssh_connect_loops):
@@ -427,20 +428,20 @@ class SingleVm1(VmReady1):
p_console = self.cloud.get_server_console(vm1)
self.__logger.debug("vm console: \n%s", p_console)
ssh.connect(
- fip.floating_ip_address,
+ fip.floating_ip_address if fip else vm1.public_v4,
username=getattr(
config.CONF,
- '{}_image_user'.format(self.case_name), self.username),
+ f'{self.case_name}_image_user', self.username),
key_filename=self.key_filename,
timeout=getattr(
config.CONF,
- '{}_vm_ssh_connect_timeout'.format(self.case_name),
+ f'{self.case_name}_vm_ssh_connect_timeout',
self.ssh_connect_timeout))
break
except Exception as exc: # pylint: disable=broad-except
self.__logger.debug(
"try %s: cannot connect to %s: %s", loop + 1,
- fip.floating_ip_address, exc)
+ fip.floating_ip_address if fip else vm1.public_v4, exc)
time.sleep(9)
else:
self.__logger.error(
@@ -476,7 +477,7 @@ class SingleVm1(VmReady1):
status = testcase.TestCase.EX_RUN_ERROR
try:
assert self.cloud
- assert super(SingleVm1, self).run(
+ assert super().run(
**kwargs) == testcase.TestCase.EX_OK
self.result = 0
self.prepare()
@@ -507,7 +508,7 @@ class SingleVm1(VmReady1):
self.cloud.delete_security_group(self.sec.id)
if self.keypair:
self.cloud.delete_keypair(self.keypair.name)
- super(SingleVm1, self).clean()
+ super().clean()
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean all resources")
@@ -527,7 +528,7 @@ class SingleVm2(SingleVm1):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'singlevm2'
- super(SingleVm2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.orig_cloud
self.project = tenantnetwork.NewProject(
@@ -541,7 +542,7 @@ class SingleVm2(SingleVm1):
def clean(self):
try:
- super(SingleVm2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
diff --git a/functest/core/tenantnetwork.py b/functest/core/tenantnetwork.py
index b4818d76f..3670dbe8a 100644
--- a/functest/core/tenantnetwork.py
+++ b/functest/core/tenantnetwork.py
@@ -61,13 +61,12 @@ class NewProject():
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
self.project = self.orig_cloud.create_project(
- name='{}-project_{}'.format(self.case_name[:18], self.guid),
- description="Created by OPNFV Functest: {}".format(
- self.case_name),
+ name=f'{self.case_name[:18]}-project_{self.guid}',
+ description=f"Created by OPNFV Functest: {self.case_name}",
domain_id=self.domain.id)
self.__logger.debug("project: %s", self.project)
self.user = self.orig_cloud.create_user(
- name='{}-user_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-user_{self.guid}',
password=self.password,
domain_id=self.domain.id)
self.__logger.debug("user: %s", self.user)
@@ -77,7 +76,7 @@ class NewProject():
elif self.orig_cloud.get_role(self.default_member.lower()):
self.role_name = self.default_member.lower()
else:
- raise Exception("Cannot detect {}".format(self.default_member))
+ raise Exception(f"Cannot detect {self.default_member}")
except Exception: # pylint: disable=broad-except
self.__logger.info("Creating default role %s", self.default_member)
role = self.orig_cloud.create_role(self.default_member)
@@ -146,29 +145,28 @@ class TenantNetwork1(testcase.TestCase):
__logger = logging.getLogger(__name__)
cidr = '192.168.120.0/24'
shared_network = False
- allow_no_fip = False
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork1'
- super(TenantNetwork1, self).__init__(**kwargs)
+ super().__init__(**kwargs)
self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
self.res_dir = os.path.join(self.dir_results, self.case_name)
self.output_log_name = 'functest.log'
self.output_debug_log_name = 'functest.debug.log'
+ self.ext_net = None
try:
cloud_config = os_client_config.get_config()
self.cloud = self.orig_cloud = shade.OpenStackCloud(
cloud_config=cloud_config)
except Exception: # pylint: disable=broad-except
self.cloud = self.orig_cloud = None
- self.ext_net = None
self.__logger.exception("Cannot connect to Cloud")
- try:
- self.ext_net = self.get_external_network(self.cloud)
- except Exception: # pylint: disable=broad-except
- self.ext_net = None
- self.__logger.exception("Cannot get the external network")
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ try:
+ self.ext_net = self.get_external_network(self.cloud)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot get the external network")
self.guid = str(uuid.uuid4())
self.network = None
self.subnet = None
@@ -220,18 +218,18 @@ class TenantNetwork1(testcase.TestCase):
Raises: expection on error
"""
assert self.cloud
- if not self.allow_no_fip:
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
assert self.ext_net
provider = {}
- if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
+ if hasattr(config.CONF, f'{self.case_name}_network_type'):
provider["network_type"] = getattr(
- config.CONF, '{}_network_type'.format(self.case_name))
- if hasattr(config.CONF, '{}_physical_network'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_network_type')
+ if hasattr(config.CONF, f'{self.case_name}_physical_network'):
provider["physical_network"] = getattr(
- config.CONF, '{}_physical_network'.format(self.case_name))
- if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_physical_network')
+ if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
provider["segmentation_id"] = getattr(
- config.CONF, '{}_segmentation_id'.format(self.case_name))
+ config.CONF, f'{self.case_name}_segmentation_id')
domain = self.orig_cloud.get_domain(
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
@@ -239,23 +237,23 @@ class TenantNetwork1(testcase.TestCase):
self.cloud.auth['project_name'],
domain_id=domain.id)
self.network = self.orig_cloud.create_network(
- '{}-net_{}'.format(self.case_name, self.guid),
+ f'{self.case_name}-net_{self.guid}',
provider=provider, project_id=project.id,
shared=self.shared_network)
self.__logger.debug("network: %s", self.network)
self.subnet = self.cloud.create_subnet(
self.network.id,
- subnet_name='{}-subnet_{}'.format(self.case_name, self.guid),
+ subnet_name=f'{self.case_name}-subnet_{self.guid}',
cidr=getattr(
- config.CONF, '{}_private_subnet_cidr'.format(self.case_name),
+ config.CONF, f'{self.case_name}_private_subnet_cidr',
self.cidr),
enable_dhcp=True,
dns_nameservers=[env.get('NAMESERVER')])
self.__logger.debug("subnet: %s", self.subnet)
self.router = self.cloud.create_router(
- name='{}-router_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-router_{self.guid}',
ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
self.__logger.debug("router: %s", self.router)
self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
@@ -265,7 +263,8 @@ class TenantNetwork1(testcase.TestCase):
try:
assert self.cloud
self.start_time = time.time()
- self.create_network_resources()
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ self.create_network_resources()
self.result = 100
status = testcase.TestCase.EX_OK
except Exception: # pylint: disable=broad-except
@@ -306,7 +305,7 @@ class TenantNetwork2(TenantNetwork1):
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork2'
- super(TenantNetwork2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.cloud
self.project = NewProject(
@@ -320,7 +319,7 @@ class TenantNetwork2(TenantNetwork1):
def clean(self):
try:
- super(TenantNetwork2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except