aboutsummaryrefslogtreecommitdiffstats
path: root/functest/core
diff options
context:
space:
mode:
Diffstat (limited to 'functest/core')
-rw-r--r--functest/core/cloudify.py219
-rw-r--r--functest/core/singlevm.py549
-rw-r--r--functest/core/tenantnetwork.py326
-rw-r--r--functest/core/vnf.py187
4 files changed, 1094 insertions, 187 deletions
diff --git a/functest/core/cloudify.py b/functest/core/cloudify.py
new file mode 100644
index 000000000..966d33645
--- /dev/null
+++ b/functest/core/cloudify.py
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Cloudify testcase implementation."""
+
+from __future__ import division
+
+import logging
+import os
+import time
+import traceback
+
+from cloudify_rest_client import CloudifyClient
+from cloudify_rest_client.executions import Execution
+import scp
+
+from functest.core import singlevm
+
+
+class Cloudify(singlevm.SingleVm2):
+ """Cloudify Orchestrator Case."""
+
+ __logger = logging.getLogger(__name__)
+
+ filename = ('/home/opnfv/functest/images/'
+ 'ubuntu-18.04-server-cloudimg-amd64.img')
+ flavor_ram = 4096
+ flavor_vcpus = 2
+ flavor_disk = 40
+ username = 'ubuntu'
+ ssh_connect_loops = 12
+ create_server_timeout = 600
+ ports = [80, 443, 5671, 53333]
+
+ cloudify_archive = ('/home/opnfv/functest/images/'
+ 'cloudify-docker-manager-community-19.01.24.tar')
+ cloudify_container = "docker-cfy-manager:latest"
+
+ def __init__(self, **kwargs):
+ """Initialize Cloudify testcase object."""
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = "cloudify"
+ super().__init__(**kwargs)
+ self.cfy_client = None
+
+ def prepare(self):
+ super().prepare()
+ for port in self.ports:
+ self.cloud.create_security_group_rule(
+ self.sec.id, port_range_min=port, port_range_max=port,
+ protocol='tcp', direction='ingress')
+
+ def execute(self):
+ """
+ Deploy Cloudify Manager.
+ """
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(self.cloudify_archive,
+ remote_path=os.path.basename(self.cloudify_archive))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo apt-get update && "
+ "sudo apt-get install -y docker.io && "
+ "sudo docker load -i "
+ f"~/{os.path.basename(self.cloudify_archive)} && "
+ "sudo docker run --name cfy_manager_local -d "
+ "--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro "
+ "--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined "
+ f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+ self.cfy_client = CloudifyClient(
+ host=self.fip.floating_ip_address if self.fip else (
+ self.sshvm.public_v4),
+ username='admin', password='admin', tenant='default_tenant')
+ self.__logger.info("Attemps running status of the Manager")
+ secret_key = "foo"
+ secret_value = "bar"
+ for loop in range(20):
+ try:
+ self.__logger.debug(
+ "status %s", self.cfy_client.manager.get_status())
+ cfy_status = self.cfy_client.manager.get_status()['status']
+ self.__logger.info(
+ "The current manager status is %s", cfy_status)
+ if str(cfy_status) != 'running':
+ raise Exception("Cloudify Manager isn't up and running")
+ for secret in iter(self.cfy_client.secrets.list()):
+ if secret_key == secret["key"]:
+ self.__logger.debug("Updating secrets: %s", secret_key)
+ self.cfy_client.secrets.update(
+ secret_key, secret_value)
+ break
+ else:
+ self.__logger.debug("Creating secrets: %s", secret_key)
+ self.cfy_client.secrets.create(secret_key, secret_value)
+ self.cfy_client.secrets.delete(secret_key)
+ self.__logger.info("Secrets API successfully reached")
+ break
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "try %s: Cloudify Manager isn't up and running \n%s",
+ loop + 1, traceback.format_exc())
+ time.sleep(30)
+ else:
+ self.__logger.error("Cloudify Manager isn't up and running")
+ return 1
+ self.__logger.info("Cloudify Manager is up and running")
+ return 0
+
+ def put_private_key(self):
+ """Put private keypair in manager"""
+ self.__logger.info("Put private keypair in manager")
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker cp ~/cloudify_ims.pem "
+ "cfy_manager_local:/etc/cloudify/ && "
+ "sudo docker exec cfy_manager_local "
+ "chmod 444 /etc/cloudify/cloudify_ims.pem")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+
+ def upload_cfy_plugins(self, yaml, wgn):
+ """Upload Cloudify plugins"""
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker exec cfy_manager_local "
+ f"cfy plugins upload -y {yaml} {wgn} && "
+ "sudo docker exec cfy_manager_local cfy status")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+
+ def kill_existing_execution(self, dep_name):
+ """kill existing execution"""
+ try:
+ self.__logger.info('Deleting the current deployment')
+ exec_list = self.cfy_client.executions.list()
+ for execution in exec_list:
+ if execution['status'] == "started":
+ try:
+ self.cfy_client.executions.cancel(
+ execution['id'], force=True)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.warning("Can't cancel the current exec")
+ execution = self.cfy_client.executions.start(
+ dep_name, 'uninstall', parameters=dict(ignore_failure=True))
+ wait_for_execution(self.cfy_client, execution, self.__logger)
+ self.cfy_client.deployments.delete(dep_name)
+ time.sleep(10)
+ self.cfy_client.blueprints.delete(dep_name)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Some issue during the undeployment ..")
+
+
+def wait_for_execution(client, execution, logger, timeout=3600, ):
+ """Wait for a workflow execution on Cloudify Manager."""
+ # if execution already ended - return without waiting
+ if execution.status in Execution.END_STATES:
+ return execution
+
+ if timeout is not None:
+ deadline = time.time() + timeout
+
+ # Poll for execution status and execution logs, until execution ends
+ # and we receive an event of type in WORKFLOW_END_TYPES
+ offset = 0
+ batch_size = 50
+ event_list = []
+ execution_ended = False
+ while True:
+ event_list = client.events.list(
+ execution_id=execution.id,
+ _offset=offset,
+ _size=batch_size,
+ include_logs=True,
+ sort='@timestamp').items
+
+ offset = offset + len(event_list)
+ for event in event_list:
+ logger.debug(event.get('message'))
+
+ if timeout is not None:
+ if time.time() > deadline:
+ raise RuntimeError(
+ 'execution of operation {execution.workflow_id} for '
+ 'deployment {execution.deployment_id} timed out')
+ # update the remaining timeout
+ timeout = deadline - time.time()
+
+ if not execution_ended:
+ execution = client.executions.get(execution.id)
+ execution_ended = execution.status in Execution.END_STATES
+
+ if execution_ended:
+ break
+
+ time.sleep(5)
+
+ return execution
+
+
+def get_execution_id(client, deployment_id):
+ """
+ Get the execution id of a env preparation.
+
+ network, security group, fip, VM creation
+ """
+ executions = client.executions.list(deployment_id=deployment_id)
+ for execution in executions:
+ if execution.workflow_id == 'create_deployment_environment':
+ return execution
+ raise RuntimeError('Failed to get create_deployment_environment '
+ 'workflow execution.'
+ f'Available executions: {executions}')
diff --git a/functest/core/singlevm.py b/functest/core/singlevm.py
new file mode 100644
index 000000000..4bce516d3
--- /dev/null
+++ b/functest/core/singlevm.py
@@ -0,0 +1,549 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Ease deploying a single VM reachable via ssh
+
+It offers a simple way to create all tenant network resources + a VM for
+advanced testcases (e.g. deploying an orchestrator).
+"""
+
+import logging
+import re
+import tempfile
+import time
+
+import paramiko
+from xtesting.core import testcase
+
+from functest.core import tenantnetwork
+from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
+
+
+class VmReady1(tenantnetwork.TenantNetwork1):
+ """Prepare a single VM (scenario1)
+
+ It inherits from TenantNetwork1 which creates all network resources and
+ prepares a future VM attached to that network.
+
+ It ensures that all testcases inheriting from SingleVm1 could work
+ without specific configurations (or at least read the same config data).
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+ filename = '/home/opnfv/functest/images/cirros-0.6.1-x86_64-disk.img'
+ image_format = 'qcow2'
+ extra_properties = {}
+ filename_alt = filename
+ image_alt_format = image_format
+ extra_alt_properties = extra_properties
+ visibility = 'private'
+ flavor_ram = 512
+ flavor_vcpus = 1
+ flavor_disk = 1
+ flavor_extra_specs = {}
+ flavor_alt_ram = 1024
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 1
+ flavor_alt_extra_specs = flavor_extra_specs
+ create_server_timeout = 180
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'vmready1'
+ super().__init__(**kwargs)
+ self.image = None
+ self.flavor = None
+
+ def publish_image(self, name=None):
+ """Publish image
+
+ It allows publishing multiple images for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: image
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ extra_properties = self.extra_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
+ extra_properties.update(
+ getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
+ image = self.cloud.create_image(
+ name if name else f'{self.case_name}-img_{self.guid}',
+ filename=getattr(
+ config.CONF, f'{self.case_name}_image',
+ self.filename),
+ meta=extra_properties,
+ disk_format=getattr(
+ config.CONF, f'{self.case_name}_image_format',
+ self.image_format),
+ visibility=getattr(
+ config.CONF, f'{self.case_name}_visibility',
+ self.visibility),
+ wait=True)
+ self.__logger.debug("image: %s", image)
+ return image
+
+ def publish_image_alt(self, name=None):
+ """Publish alternative image
+
+ It allows publishing multiple images for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: image
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ extra_alt_properties = self.extra_alt_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_alt_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
+ extra_alt_properties.update(
+ getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {}))
+ image = self.cloud.create_image(
+ name if name else f'{self.case_name}-img_alt_{self.guid}',
+ filename=getattr(
+ config.CONF, f'{self.case_name}_image_alt',
+ self.filename_alt),
+ meta=extra_alt_properties,
+ disk_format=getattr(
+ config.CONF, f'{self.case_name}_image_alt_format',
+ self.image_format),
+ visibility=getattr(
+ config.CONF, f'{self.case_name}_visibility',
+ self.visibility),
+ wait=True)
+ self.__logger.debug("image: %s", image)
+ return image
+
+ def create_flavor(self, name=None):
+ """Create flavor
+
+ It allows creating multiple flavors for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: flavor
+
+ Raises: expection on error
+ """
+ assert self.orig_cloud
+ flavor = self.orig_cloud.create_flavor(
+ name if name else f'{self.case_name}-flavor_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_ram',
+ self.flavor_ram),
+ getattr(config.CONF, f'{self.case_name}_flavor_vcpus',
+ self.flavor_vcpus),
+ getattr(config.CONF, f'{self.case_name}_flavor_disk',
+ self.flavor_disk))
+ self.__logger.debug("flavor: %s", flavor)
+ flavor_extra_specs = self.flavor_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
+ flavor_extra_specs.update(
+ getattr(config.CONF,
+ f'{self.case_name}_flavor_extra_specs', {}))
+ self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs)
+ return flavor
+
+ def create_flavor_alt(self, name=None):
+ """Create flavor
+
+ It allows creating multiple alt flavors for the child testcases. It
+ forces the same configuration for all subtestcases.
+
+ Returns: flavor
+
+ Raises: expection on error
+ """
+ assert self.orig_cloud
+ flavor = self.orig_cloud.create_flavor(
+ name if name else f'{self.case_name}-flavor_alt_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_ram',
+ self.flavor_alt_ram),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus',
+ self.flavor_alt_vcpus),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_disk',
+ self.flavor_alt_disk))
+ self.__logger.debug("flavor: %s", flavor)
+ flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_alt_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
+ flavor_alt_extra_specs.update(
+ getattr(config.CONF,
+ f'{self.case_name}_flavor_alt_extra_specs', {}))
+ self.orig_cloud.set_flavor_specs(
+ flavor.id, flavor_alt_extra_specs)
+ return flavor
+
+ def boot_vm(self, name=None, **kwargs):
+ """Boot the virtual machine
+
+ It allows booting multiple machines for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: vm
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ vm1 = self.cloud.create_server(
+ name if name else f'{self.case_name}-vm_{self.guid}',
+ image=self.image.id, flavor=self.flavor.id,
+ auto_ip=False,
+ network=self.network.id if self.network else env.get(
+ "EXTERNAL_NETWORK"),
+ timeout=self.create_server_timeout, wait=True, **kwargs)
+ self.__logger.debug("vm: %s", vm1)
+ return vm1
+
+ def check_regex_in_console(self, name, regex=' login: ', loop=6):
+ """Wait for specific message in console
+
+ Returns: True or False on errors
+ """
+ assert self.cloud
+ for iloop in range(loop):
+ console = self.cloud.get_server_console(name)
+ self.__logger.debug("console: \n%s", console)
+ if re.search(regex, console):
+ self.__logger.debug(
+ "regex found: '%s' in console\n%s", regex, console)
+ return True
+ self.__logger.debug(
+ "try %s: cannot find regex '%s' in console\n%s",
+ iloop + 1, regex, console)
+ time.sleep(10)
+ self.__logger.error("cannot find regex '%s' in console", regex)
+ return False
+
+ def clean_orphan_security_groups(self):
+ """Clean all security groups which are not owned by an existing tenant
+
+ It lists all orphan security groups in use as debug to avoid
+ misunderstanding the testcase results (it could happen if cloud admin
+ removes accounts without cleaning the virtual machines)
+ """
+ sec_groups = self.orig_cloud.list_security_groups()
+ for sec_group in sec_groups:
+ if not sec_group.tenant_id:
+ continue
+ if not self.orig_cloud.get_project(sec_group.tenant_id):
+ self.__logger.debug("Cleaning security group %s", sec_group.id)
+ try:
+ self.orig_cloud.delete_security_group(sec_group.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "Orphan security group %s in use", sec_group.id)
+
+ def count_hypervisors(self):
+ """Count hypervisors."""
+ if env.get('SKIP_DOWN_HYPERVISORS').lower() == 'false':
+ return len(self.orig_cloud.list_hypervisors())
+ return self.count_active_hypervisors()
+
+ def count_active_hypervisors(self):
+ """Count all hypervisors which are up."""
+ compute_cnt = 0
+ for hypervisor in self.orig_cloud.list_hypervisors():
+ if hypervisor['state'] == 'up':
+ compute_cnt += 1
+ else:
+ self.__logger.warning(
+ "%s is down", hypervisor['hypervisor_hostname'])
+ return compute_cnt
+
+ def run(self, **kwargs):
+ """Boot the new VM
+
+ Here are the main actions:
+ - publish the image
+ - create the flavor
+
+ Returns:
+ - TestCase.EX_OK
+ - TestCase.EX_RUN_ERROR on error
+ """
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ assert super().run(
+ **kwargs) == testcase.TestCase.EX_OK
+ self.image = self.publish_image()
+ self.flavor = self.create_flavor()
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ self.result = 0
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.orig_cloud
+ assert self.cloud
+ super().clean()
+ if self.image:
+ self.cloud.delete_image(self.image.id)
+ if self.flavor:
+ self.orig_cloud.delete_flavor(self.flavor.id)
+ if env.get('CLEAN_ORPHAN_SECURITY_GROUPS').lower() == 'true':
+ self.clean_orphan_security_groups()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class VmReady2(VmReady1):
+ """Deploy a single VM reachable via ssh (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources, flavors, images, etc. required by advanced testcases.
+
+ It ensures that all testcases inheriting from SingleVm2 could work
+ without specific configurations (or at least read the same config data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'vmready2'
+ super().__init__(**kwargs)
+ try:
+ assert self.orig_cloud
+ self.project = tenantnetwork.NewProject(
+ self.orig_cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class SingleVm1(VmReady1):
+ """Deploy a single VM reachable via ssh (scenario1)
+
+ It inherits from TenantNetwork1 which creates all network resources and
+ completes it by booting a VM attached to that network.
+
+ It ensures that all testcases inheriting from SingleVm1 could work
+ without specific configurations (or at least read the same config data).
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+ username = 'cirros'
+ ssh_connect_timeout = 1
+ ssh_connect_loops = 6
+ create_floating_ip_timeout = 120
+ check_console_loop = 6
+ check_console_regex = ' login: '
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'singlevm1'
+ super().__init__(**kwargs)
+ self.sshvm = None
+ self.sec = None
+ self.fip = None
+ self.keypair = None
+ self.ssh = None
+ (_, self.key_filename) = tempfile.mkstemp()
+
+ def prepare(self):
+ """Create the security group and the keypair
+
+ It can be overriden to set other rules according to the services
+ running in the VM
+
+ Raises: Exception on error
+ """
+ assert self.cloud
+ self.keypair = self.cloud.create_keypair(
+ f'{self.case_name}-kp_{self.guid}')
+ self.__logger.debug("keypair: %s", self.keypair)
+ self.__logger.debug("private_key:\n%s", self.keypair.private_key)
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
+ private_key_file.write(self.keypair.private_key)
+ self.sec = self.cloud.create_security_group(
+ f'{self.case_name}-sg_{self.guid}',
+ f'created by OPNFV Functest ({self.case_name})')
+ self.cloud.create_security_group_rule(
+ self.sec.id, port_range_min='22', port_range_max='22',
+ protocol='tcp', direction='ingress')
+ self.cloud.create_security_group_rule(
+ self.sec.id, protocol='icmp', direction='ingress')
+
+ def connect(self, vm1):
+ """Connect to a virtual machine via ssh
+
+ It first adds a floating ip to the virtual machine and then establishes
+ the ssh connection.
+
+ Returns:
+ - (fip, ssh)
+ - None on error
+ """
+ assert vm1
+ fip = None
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ fip = self.cloud.create_floating_ip(
+ network=self.ext_net.id, server=vm1, wait=True,
+ timeout=self.create_floating_ip_timeout)
+ self.__logger.debug("floating_ip: %s", fip)
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
+ for loop in range(self.ssh_connect_loops):
+ try:
+ p_console = self.cloud.get_server_console(vm1)
+ self.__logger.debug("vm console: \n%s", p_console)
+ ssh.connect(
+ fip.floating_ip_address if fip else vm1.public_v4,
+ username=getattr(
+ config.CONF,
+ f'{self.case_name}_image_user', self.username),
+ key_filename=self.key_filename,
+ timeout=getattr(
+ config.CONF,
+ f'{self.case_name}_vm_ssh_connect_timeout',
+ self.ssh_connect_timeout))
+ break
+ except Exception as exc: # pylint: disable=broad-except
+ self.__logger.debug(
+ "try %s: cannot connect to %s: %s", loop + 1,
+ fip.floating_ip_address if fip else vm1.public_v4, exc)
+ time.sleep(9)
+ else:
+ self.__logger.error(
+ "cannot connect to %s", fip.floating_ip_address)
+ return None
+ return (fip, ssh)
+
+ def execute(self):
+ """Say hello world via ssh
+
+ It can be overriden to execute any command.
+
+ Returns: echo exit codes
+ """
+ (_, stdout, stderr) = self.ssh.exec_command('echo Hello World')
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+ return stdout.channel.recv_exit_status()
+
+ def run(self, **kwargs):
+ """Boot the new VM
+
+ Here are the main actions:
+ - add a new ssh key
+ - boot the VM
+ - create the security group
+ - execute the right command over ssh
+
+ Returns:
+ - TestCase.EX_OK
+ - TestCase.EX_RUN_ERROR on error
+ """
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ assert super().run(
+ **kwargs) == testcase.TestCase.EX_OK
+ self.result = 0
+ self.prepare()
+ self.sshvm = self.boot_vm(
+ key_name=self.keypair.id, security_groups=[self.sec.id])
+ if self.check_regex_in_console(
+ self.sshvm.name, regex=self.check_console_regex,
+ loop=self.check_console_loop):
+ (self.fip, self.ssh) = self.connect(self.sshvm)
+ if not self.execute():
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.orig_cloud
+ assert self.cloud
+ if self.fip:
+ self.cloud.delete_floating_ip(self.fip.id)
+ if self.sshvm:
+ self.cloud.delete_server(self.sshvm, wait=True)
+ if self.sec:
+ self.cloud.delete_security_group(self.sec.id)
+ if self.keypair:
+ self.cloud.delete_keypair(self.keypair.name)
+ super().clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class SingleVm2(SingleVm1):
+ """Deploy a single VM reachable via ssh (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources and vms required by advanced testcases.
+
+ It ensures that all testcases inheriting from SingleVm2 could work
+ without specific configurations (or at least read the same config data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'singlevm2'
+ super().__init__(**kwargs)
+ try:
+ assert self.orig_cloud
+ self.project = tenantnetwork.NewProject(
+ self.orig_cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
diff --git a/functest/core/tenantnetwork.py b/functest/core/tenantnetwork.py
new file mode 100644
index 000000000..3670dbe8a
--- /dev/null
+++ b/functest/core/tenantnetwork.py
@@ -0,0 +1,326 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Ease deploying tenant networks
+
+It offers a simple way to create all tenant network resources required by a
+testcase (including all Functest ones):
+
+ - TenantNetwork1 selects the user and the project set as env vars
+ - TenantNetwork2 creates a user and project to isolate the same resources
+
+This classes could be reused by more complexed scenarios (Single VM)
+"""
+
+import logging
+import os
+import time
+import uuid
+
+import os_client_config
+import shade
+from tempest.lib.common.utils import data_utils
+from xtesting.core import testcase
+
+from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
+
+
+class NewProject():
+ """Ease creating new projects/users"""
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, cloud, case_name, guid):
+ self.cloud = None
+ self.orig_cloud = cloud
+ self.case_name = case_name
+ self.guid = guid
+ self.project = None
+ self.user = None
+ self.password = None
+ self.domain = None
+ self.role_name = None
+ self.default_member = env.get('NEW_USER_ROLE')
+
+ def create(self):
+ """Create projects/users"""
+ assert self.orig_cloud
+ assert self.case_name
+ self.password = data_utils.rand_password().replace('%', '!')
+ self.__logger.debug("password: %s", self.password)
+ self.domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ self.project = self.orig_cloud.create_project(
+ name=f'{self.case_name[:18]}-project_{self.guid}',
+ description=f"Created by OPNFV Functest: {self.case_name}",
+ domain_id=self.domain.id)
+ self.__logger.debug("project: %s", self.project)
+ self.user = self.orig_cloud.create_user(
+ name=f'{self.case_name}-user_{self.guid}',
+ password=self.password,
+ domain_id=self.domain.id)
+ self.__logger.debug("user: %s", self.user)
+ try:
+ if self.orig_cloud.get_role(self.default_member):
+ self.role_name = self.default_member
+ elif self.orig_cloud.get_role(self.default_member.lower()):
+ self.role_name = self.default_member.lower()
+ else:
+ raise Exception(f"Cannot detect {self.default_member}")
+ except Exception: # pylint: disable=broad-except
+ self.__logger.info("Creating default role %s", self.default_member)
+ role = self.orig_cloud.create_role(self.default_member)
+ self.role_name = role.name
+ self.__logger.debug("role: %s", role)
+ self.orig_cloud.grant_role(
+ self.role_name, user=self.user.id, project=self.project.id,
+ domain=self.domain.id)
+ osconfig = os_client_config.config.OpenStackConfig()
+ osconfig.cloud_config[
+ 'clouds']['envvars']['project_name'] = self.project.name
+ osconfig.cloud_config[
+ 'clouds']['envvars']['project_id'] = self.project.id
+ osconfig.cloud_config['clouds']['envvars']['username'] = self.user.name
+ osconfig.cloud_config['clouds']['envvars']['password'] = self.password
+ self.__logger.debug("cloud_config %s", osconfig.cloud_config)
+ self.cloud = shade.OpenStackCloud(
+ cloud_config=osconfig.get_one_cloud())
+ self.__logger.debug("new cloud %s", self.cloud.auth)
+
+ def get_environ(self):
+ "Get new environ"
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.user.name,
+ OS_PROJECT_NAME=self.project.name,
+ OS_PROJECT_ID=self.project.id,
+ OS_PASSWORD=self.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ return environ
+
+ def clean(self):
+ """Remove projects/users"""
+ try:
+ assert self.orig_cloud
+ if self.user:
+ self.orig_cloud.delete_user(self.user.id)
+ if self.project:
+ self.orig_cloud.delete_project(self.project.id)
+ secgroups = self.orig_cloud.list_security_groups(
+ filters={'name': 'default',
+ 'project_id': self.project.id})
+ if secgroups:
+ sec_id = secgroups[0].id
+ self.orig_cloud.delete_security_group(sec_id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class TenantNetwork1(testcase.TestCase):
+ # pylint: disable=too-many-instance-attributes
+ """Create a tenant network (scenario1)
+
+ It creates and configures all tenant network resources required by
+ advanced testcases (subnet, network and router).
+
+ It ensures that all testcases inheriting from TenantNetwork1 could work
+ without network specific configurations (or at least read the same config
+ data).
+ """
+
+ __logger = logging.getLogger(__name__)
+ cidr = '192.168.120.0/24'
+ shared_network = False
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'tenantnetwork1'
+ super().__init__(**kwargs)
+ self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
+ self.res_dir = os.path.join(self.dir_results, self.case_name)
+ self.output_log_name = 'functest.log'
+ self.output_debug_log_name = 'functest.debug.log'
+ self.ext_net = None
+ try:
+ cloud_config = os_client_config.get_config()
+ self.cloud = self.orig_cloud = shade.OpenStackCloud(
+ cloud_config=cloud_config)
+ except Exception: # pylint: disable=broad-except
+ self.cloud = self.orig_cloud = None
+ self.__logger.exception("Cannot connect to Cloud")
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ try:
+ self.ext_net = self.get_external_network(self.cloud)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot get the external network")
+ self.guid = str(uuid.uuid4())
+ self.network = None
+ self.subnet = None
+ self.router = None
+
+ @staticmethod
+ def get_external_network(cloud):
+ """
+ Return the configured external network name or
+ the first retrieved external network name
+ """
+ assert cloud
+ if env.get("EXTERNAL_NETWORK"):
+ network = cloud.get_network(
+ env.get("EXTERNAL_NETWORK"), {"router:external": True})
+ if network:
+ return network
+ networks = cloud.list_networks({"router:external": True})
+ if networks:
+ return networks[0]
+ return None
+
+ @staticmethod
+ def get_default_role(cloud, member="Member"):
+ """Get the default role
+
+ It also tests the role in lowercase to avoid possible conflicts.
+ """
+ role = cloud.get_role(member)
+ if not role:
+ role = cloud.get_role(member.lower())
+ return role
+
+ @staticmethod
+ def get_public_auth_url(cloud):
+ """Get Keystone public endpoint"""
+ keystone_id = functest_utils.search_services(cloud, 'keystone')[0].id
+ endpoint = cloud.search_endpoints(
+ filters={'interface': 'public',
+ 'service_id': keystone_id})[0].url
+ return endpoint
+
+ def create_network_resources(self):
+ """Create all tenant network resources
+
+ It creates a router which gateway is the external network detected.
+ The new subnet is attached to that router.
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ assert self.ext_net
+ provider = {}
+ if hasattr(config.CONF, f'{self.case_name}_network_type'):
+ provider["network_type"] = getattr(
+ config.CONF, f'{self.case_name}_network_type')
+ if hasattr(config.CONF, f'{self.case_name}_physical_network'):
+ provider["physical_network"] = getattr(
+ config.CONF, f'{self.case_name}_physical_network')
+ if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
+ provider["segmentation_id"] = getattr(
+ config.CONF, f'{self.case_name}_segmentation_id')
+ domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ project = self.orig_cloud.get_project(
+ self.cloud.auth['project_name'],
+ domain_id=domain.id)
+ self.network = self.orig_cloud.create_network(
+ f'{self.case_name}-net_{self.guid}',
+ provider=provider, project_id=project.id,
+ shared=self.shared_network)
+ self.__logger.debug("network: %s", self.network)
+
+ self.subnet = self.cloud.create_subnet(
+ self.network.id,
+ subnet_name=f'{self.case_name}-subnet_{self.guid}',
+ cidr=getattr(
+ config.CONF, f'{self.case_name}_private_subnet_cidr',
+ self.cidr),
+ enable_dhcp=True,
+ dns_nameservers=[env.get('NAMESERVER')])
+ self.__logger.debug("subnet: %s", self.subnet)
+
+ self.router = self.cloud.create_router(
+ name=f'{self.case_name}-router_{self.guid}',
+ ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
+ self.__logger.debug("router: %s", self.router)
+ self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
+
+ def run(self, **kwargs):
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ self.start_time = time.time()
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ self.create_network_resources()
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.cloud
+ if self.router:
+ if self.subnet:
+ self.cloud.remove_router_interface(
+ self.router, self.subnet.id)
+ self.cloud.delete_router(self.router.id)
+ if self.subnet:
+ self.cloud.delete_subnet(self.subnet.id)
+ if self.network:
+ self.cloud.delete_network(self.network.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("cannot clean all resources")
+
+
+class TenantNetwork2(TenantNetwork1):
+ """Create a tenant network (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources required by a testcase (subnet, network and router).
+
+ It ensures that all testcases inheriting from TenantNetwork2 could work
+ without network specific configurations (or at least read the same config
+ data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'tenantnetwork2'
+ super().__init__(**kwargs)
+ try:
+ assert self.cloud
+ self.project = NewProject(
+ self.cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
diff --git a/functest/core/vnf.py b/functest/core/vnf.py
deleted file mode 100644
index a6afd4e6b..000000000
--- a/functest/core/vnf.py
+++ /dev/null
@@ -1,187 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class of all VNF TestCases."""
-
-import logging
-import uuid
-
-from snaps.config.user import UserConfig
-from snaps.config.project import ProjectConfig
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.create_project import OpenStackProject
-from snaps.openstack.utils import keystone_utils
-from snaps.openstack.tests import openstack_tests
-
-from xtesting.core import vnf
-from functest.utils import constants
-
-__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
- "Valentin Boucher <valentin.boucher@orange.com>")
-
-
-class VnfPreparationException(vnf.VnfPreparationException):
- """Raise when VNF preparation cannot be executed."""
-
-
-class OrchestratorDeploymentException(vnf.OrchestratorDeploymentException):
- """Raise when orchestrator cannot be deployed."""
-
-
-class VnfDeploymentException(vnf.VnfDeploymentException):
- """Raise when VNF cannot be deployed."""
-
-
-class VnfTestException(vnf.VnfTestException):
- """Raise when VNF cannot be tested."""
-
-
-class VnfOnBoarding(vnf.VnfOnBoarding):
- # pylint: disable=too-many-instance-attributes
- """Base model for OpenStack VNF test cases."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(VnfOnBoarding, self).__init__(**kwargs)
- self.uuid = uuid.uuid4()
- self.user_name = "{}-{}".format(self.case_name, self.uuid)
- self.tenant_name = "{}-{}".format(self.case_name, self.uuid)
- self.snaps_creds = {}
- self.created_object = []
- self.os_project = None
- self.tenant_description = "Created by OPNFV Functest: {}".format(
- self.case_name)
-
- def prepare(self):
- """
- Prepare the environment for VNF testing:
-
- * Creation of a user,
- * Creation of a tenant,
- * Allocation admin role to the user on this tenant
-
- Returns base.TestCase.EX_OK if preparation is successfull
-
- Raise VnfPreparationException in case of problem
- """
- try:
- self.__logger.info(
- "Prepare VNF: %s, description: %s", self.case_name,
- self.tenant_description)
- snaps_creds = openstack_tests.get_credentials(
- os_env_file=constants.ENV_FILE)
-
- self.os_project = OpenStackProject(
- snaps_creds,
- ProjectConfig(
- name=self.tenant_name,
- description=self.tenant_description,
- domain=snaps_creds.project_domain_name
- ))
- self.os_project.create()
- self.created_object.append(self.os_project)
-
- snaps_creds.project_domain_id = \
- self.os_project.get_project().domain_id
- snaps_creds.user_domain_id = \
- self.os_project.get_project().domain_id
-
- for role in ['admin', 'Admin']:
- if keystone_utils.get_role_by_name(
- keystone_utils.keystone_client(snaps_creds), role):
- admin_role = role
- break
-
- user_creator = OpenStackUser(
- snaps_creds,
- UserConfig(
- name=self.user_name,
- password=str(uuid.uuid4()),
- project_name=self.tenant_name,
- domain_name=snaps_creds.user_domain_name,
- roles={admin_role: self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
- self.snaps_creds = user_creator.get_os_creds(self.tenant_name)
- self.__logger.debug("snaps creds: %s", self.snaps_creds)
-
- return vnf.VnfOnBoarding.EX_OK
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Exception raised during VNF preparation")
- raise VnfPreparationException
-
- def deploy_orchestrator(self):
- """
- Deploy an orchestrator (optional).
-
- If this method is overriden then raise orchestratorDeploymentException
- if error during orchestrator deployment
- """
- self.__logger.info("Deploy orchestrator (if necessary)")
- return True
-
- def deploy_vnf(self):
- """
- Deploy the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- The deployment can be executed via a specific orchestrator
- or using build-in orchestrators such as heat, OpenBaton, cloudify,
- juju, onap, ...
-
- Returns:
- True if the VNF is properly deployed
- False if the VNF is not deployed
-
- Raise VnfDeploymentException if error during VNF deployment
- """
- self.__logger.error("VNF must be deployed")
- raise VnfDeploymentException
-
- def test_vnf(self):
- """
- Test the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- Once a VNF is deployed, it is assumed that specific test suite can be
- run to validate the VNF.
- Please note that the same test suite can be used on several test case
- (e.g. clearwater test suite can be used whatever the orchestrator used
- for the deployment)
-
- Returns:
- True if VNF tests are PASS
- False if test suite is FAIL
-
- Raise VnfTestException if error during VNF test
- """
- self.__logger.error("VNF must be tested")
- raise VnfTestException
-
- def clean(self):
- """
- Clean VNF test case.
-
- It is up to the test providers to delete resources used for the tests.
- By default we clean:
-
- * the user,
- * the tenant
- """
- self.__logger.info('Removing the VNF resources ..')
- for creator in reversed(self.created_object):
- try:
- creator.clean()
- except Exception as exc: # pylint: disable=broad-except
- self.__logger.error('Unexpected error cleaning - %s', exc)