diff options
Diffstat (limited to 'functest/core')
-rw-r--r-- | functest/core/cloudify.py | 219 | ||||
-rw-r--r-- | functest/core/feature.py | 134 | ||||
-rw-r--r-- | functest/core/robotframework.py | 125 | ||||
-rw-r--r-- | functest/core/singlevm.py | 549 | ||||
-rw-r--r-- | functest/core/tenantnetwork.py | 326 | ||||
-rw-r--r-- | functest/core/testcase.py | 186 | ||||
-rw-r--r-- | functest/core/unit.py | 92 | ||||
-rw-r--r-- | functest/core/vnf.py | 206 |
8 files changed, 1094 insertions, 743 deletions
diff --git a/functest/core/cloudify.py b/functest/core/cloudify.py new file mode 100644 index 000000000..966d33645 --- /dev/null +++ b/functest/core/cloudify.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Cloudify testcase implementation.""" + +from __future__ import division + +import logging +import os +import time +import traceback + +from cloudify_rest_client import CloudifyClient +from cloudify_rest_client.executions import Execution +import scp + +from functest.core import singlevm + + +class Cloudify(singlevm.SingleVm2): + """Cloudify Orchestrator Case.""" + + __logger = logging.getLogger(__name__) + + filename = ('/home/opnfv/functest/images/' + 'ubuntu-18.04-server-cloudimg-amd64.img') + flavor_ram = 4096 + flavor_vcpus = 2 + flavor_disk = 40 + username = 'ubuntu' + ssh_connect_loops = 12 + create_server_timeout = 600 + ports = [80, 443, 5671, 53333] + + cloudify_archive = ('/home/opnfv/functest/images/' + 'cloudify-docker-manager-community-19.01.24.tar') + cloudify_container = "docker-cfy-manager:latest" + + def __init__(self, **kwargs): + """Initialize Cloudify testcase object.""" + if "case_name" not in kwargs: + kwargs["case_name"] = "cloudify" + super().__init__(**kwargs) + self.cfy_client = None + + def prepare(self): + super().prepare() + for port in self.ports: + self.cloud.create_security_group_rule( + self.sec.id, port_range_min=port, port_range_max=port, + protocol='tcp', direction='ingress') + + def execute(self): + """ + Deploy Cloudify Manager. + """ + scpc = scp.SCPClient(self.ssh.get_transport()) + scpc.put(self.cloudify_archive, + remote_path=os.path.basename(self.cloudify_archive)) + (_, stdout, stderr) = self.ssh.exec_command( + "sudo apt-get update && " + "sudo apt-get install -y docker.io && " + "sudo docker load -i " + f"~/{os.path.basename(self.cloudify_archive)} && " + "sudo docker run --name cfy_manager_local -d " + "--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro " + "--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined " + f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}") + self.__logger.debug("output:\n%s", stdout.read().decode("utf-8")) + self.__logger.debug("error:\n%s", stderr.read().decode("utf-8")) + self.cfy_client = CloudifyClient( + host=self.fip.floating_ip_address if self.fip else ( + self.sshvm.public_v4), + username='admin', password='admin', tenant='default_tenant') + self.__logger.info("Attemps running status of the Manager") + secret_key = "foo" + secret_value = "bar" + for loop in range(20): + try: + self.__logger.debug( + "status %s", self.cfy_client.manager.get_status()) + cfy_status = self.cfy_client.manager.get_status()['status'] + self.__logger.info( + "The current manager status is %s", cfy_status) + if str(cfy_status) != 'running': + raise Exception("Cloudify Manager isn't up and running") + for secret in iter(self.cfy_client.secrets.list()): + if secret_key == secret["key"]: + self.__logger.debug("Updating secrets: %s", secret_key) + self.cfy_client.secrets.update( + secret_key, secret_value) + break + else: + self.__logger.debug("Creating secrets: %s", secret_key) + self.cfy_client.secrets.create(secret_key, secret_value) + self.cfy_client.secrets.delete(secret_key) + self.__logger.info("Secrets API successfully reached") + break + except Exception: # pylint: disable=broad-except + self.__logger.debug( + "try %s: Cloudify Manager isn't up and running \n%s", + loop + 1, traceback.format_exc()) + time.sleep(30) + else: + self.__logger.error("Cloudify Manager isn't up and running") + return 1 + self.__logger.info("Cloudify Manager is up and running") + return 0 + + def put_private_key(self): + """Put private keypair in manager""" + self.__logger.info("Put private keypair in manager") + scpc = scp.SCPClient(self.ssh.get_transport()) + scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem') + (_, stdout, stderr) = self.ssh.exec_command( + "sudo docker cp ~/cloudify_ims.pem " + "cfy_manager_local:/etc/cloudify/ && " + "sudo docker exec cfy_manager_local " + "chmod 444 /etc/cloudify/cloudify_ims.pem") + self.__logger.debug("output:\n%s", stdout.read().decode("utf-8")) + self.__logger.debug("error:\n%s", stderr.read().decode("utf-8")) + + def upload_cfy_plugins(self, yaml, wgn): + """Upload Cloudify plugins""" + (_, stdout, stderr) = self.ssh.exec_command( + "sudo docker exec cfy_manager_local " + f"cfy plugins upload -y {yaml} {wgn} && " + "sudo docker exec cfy_manager_local cfy status") + self.__logger.debug("output:\n%s", stdout.read().decode("utf-8")) + self.__logger.debug("error:\n%s", stderr.read().decode("utf-8")) + + def kill_existing_execution(self, dep_name): + """kill existing execution""" + try: + self.__logger.info('Deleting the current deployment') + exec_list = self.cfy_client.executions.list() + for execution in exec_list: + if execution['status'] == "started": + try: + self.cfy_client.executions.cancel( + execution['id'], force=True) + except Exception: # pylint: disable=broad-except + self.__logger.warning("Can't cancel the current exec") + execution = self.cfy_client.executions.start( + dep_name, 'uninstall', parameters=dict(ignore_failure=True)) + wait_for_execution(self.cfy_client, execution, self.__logger) + self.cfy_client.deployments.delete(dep_name) + time.sleep(10) + self.cfy_client.blueprints.delete(dep_name) + except Exception: # pylint: disable=broad-except + self.__logger.exception("Some issue during the undeployment ..") + + +def wait_for_execution(client, execution, logger, timeout=3600, ): + """Wait for a workflow execution on Cloudify Manager.""" + # if execution already ended - return without waiting + if execution.status in Execution.END_STATES: + return execution + + if timeout is not None: + deadline = time.time() + timeout + + # Poll for execution status and execution logs, until execution ends + # and we receive an event of type in WORKFLOW_END_TYPES + offset = 0 + batch_size = 50 + event_list = [] + execution_ended = False + while True: + event_list = client.events.list( + execution_id=execution.id, + _offset=offset, + _size=batch_size, + include_logs=True, + sort='@timestamp').items + + offset = offset + len(event_list) + for event in event_list: + logger.debug(event.get('message')) + + if timeout is not None: + if time.time() > deadline: + raise RuntimeError( + 'execution of operation {execution.workflow_id} for ' + 'deployment {execution.deployment_id} timed out') + # update the remaining timeout + timeout = deadline - time.time() + + if not execution_ended: + execution = client.executions.get(execution.id) + execution_ended = execution.status in Execution.END_STATES + + if execution_ended: + break + + time.sleep(5) + + return execution + + +def get_execution_id(client, deployment_id): + """ + Get the execution id of a env preparation. + + network, security group, fip, VM creation + """ + executions = client.executions.list(deployment_id=deployment_id) + for execution in executions: + if execution.workflow_id == 'create_deployment_environment': + return execution + raise RuntimeError('Failed to get create_deployment_environment ' + 'workflow execution.' + f'Available executions: {executions}') diff --git a/functest/core/feature.py b/functest/core/feature.py deleted file mode 100644 index 3200dad85..000000000 --- a/functest/core/feature.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 ZTE Corp and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent classes of all Functest Features. - -Feature is considered as TestCase offered by Third-party. It offers -helpers to run any python method or any bash command. -""" - -import logging -import subprocess -import time - -import functest.core.testcase as base -from functest.utils.constants import CONST - -__author__ = ("Serena Feng <feng.xiaowei@zte.com.cn>, " - "Cedric Ollivier <cedric.ollivier@orange.com>") - - -class Feature(base.TestCase): - """Base model for single feature.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(Feature, self).__init__(**kwargs) - self.result_file = "{}/{}.log".format( - CONST.__getattribute__('dir_results'), self.case_name) - try: - module = kwargs['run']['module'] - self.logger = logging.getLogger(module) - except KeyError: - self.__logger.warning( - "Cannot get module name %s. Using %s as fallback", - kwargs, self.case_name) - self.logger = logging.getLogger(self.case_name) - handler = logging.StreamHandler() - handler.setLevel(logging.WARN) - self.logger.addHandler(handler) - handler = logging.FileHandler(self.result_file) - handler.setLevel(logging.DEBUG) - self.logger.addHandler(handler) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - def execute(self, **kwargs): - """Execute the Python method. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must return 0 if success or anything - else if failure. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - -1. - """ - # pylint: disable=unused-argument,no-self-use - return -1 - - def run(self, **kwargs): - """Run the feature. - - It allows executing any Python method by calling execute(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time. - - It doesn't fulfill details when pushing the results to the DB. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if execute() returns 0, - TestCase.EX_RUN_ERROR otherwise. - """ - self.start_time = time.time() - exit_code = base.TestCase.EX_RUN_ERROR - self.result = 0 - try: - if self.execute(**kwargs) == 0: - exit_code = base.TestCase.EX_OK - self.result = 100 - except Exception: # pylint: disable=broad-except - self.__logger.exception("%s FAILED", self.project_name) - self.__logger.info("Test result is stored in '%s'", self.result_file) - self.stop_time = time.time() - return exit_code - - -class BashFeature(Feature): - """Class designed to run any bash command.""" - - __logger = logging.getLogger(__name__) - - def execute(self, **kwargs): - """Execute the cmd passed as arg - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - 0 if cmd returns 0, - -1 otherwise. - """ - ret = -1 - try: - cmd = kwargs["cmd"] - with open(self.result_file, 'w+') as f_stdout: - proc = subprocess.Popen(cmd.split(), stdout=f_stdout, - stderr=subprocess.STDOUT) - ret = proc.wait() - if ret != 0: - self.__logger.error("Execute command: %s failed", cmd) - except KeyError: - self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs) - return ret diff --git a/functest/core/robotframework.py b/functest/core/robotframework.py deleted file mode 100644 index 689d9d946..000000000 --- a/functest/core/robotframework.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define classes required to run any Robot suites.""" - -from __future__ import division - -import errno -import logging -import os - -import robot.api -from robot.errors import RobotError -import robot.run -from robot.utils.robottime import timestamp_to_secs -from six import StringIO - -from functest.core import testcase -from functest.utils import constants - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class ResultVisitor(robot.api.ResultVisitor): - """Visitor to get result details.""" - - def __init__(self): - self._data = [] - - def visit_test(self, test): - output = {} - output['name'] = test.name - output['parent'] = test.parent.name - output['status'] = test.status - output['starttime'] = test.starttime - output['endtime'] = test.endtime - output['critical'] = test.critical - output['text'] = test.message - output['elapsedtime'] = test.elapsedtime - self._data.append(output) - - def get_data(self): - """Get the details of the result.""" - return self._data - - -class RobotFramework(testcase.TestCase): - """RobotFramework runner.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - self.res_dir = os.path.join( - constants.CONST.__getattribute__('dir_results'), 'robot') - self.xml_file = os.path.join(self.res_dir, 'output.xml') - super(RobotFramework, self).__init__(**kwargs) - - def parse_results(self): - """Parse output.xml and get the details in it.""" - result = robot.api.ExecutionResult(self.xml_file) - visitor = ResultVisitor() - result.visit(visitor) - try: - self.result = 100 * ( - result.suite.statistics.critical.passed / - result.suite.statistics.critical.total) - except ZeroDivisionError: - self.__logger.error("No test has been run") - self.start_time = timestamp_to_secs(result.suite.starttime) - self.stop_time = timestamp_to_secs(result.suite.endtime) - self.details = {} - self.details['description'] = result.suite.name - self.details['tests'] = visitor.get_data() - - def run(self, **kwargs): - """Run the RobotFramework suites - - Here are the steps: - * create the output directories if required, - * get the results in output.xml, - * delete temporary files. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - EX_OK if all suites ran well. - EX_RUN_ERROR otherwise. - """ - try: - suites = kwargs["suites"] - variable = kwargs.get("variable", []) - except KeyError: - self.__logger.exception("Mandatory args were not passed") - return self.EX_RUN_ERROR - try: - os.makedirs(self.res_dir) - except OSError as ex: - if ex.errno != errno.EEXIST: - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - stream = StringIO() - robot.run(*suites, variable=variable, output=self.xml_file, - log='NONE', report='NONE', stdout=stream) - self.__logger.info("\n" + stream.getvalue()) - self.__logger.info("Results were successfully generated") - try: - self.parse_results() - self.__logger.info("Results were successfully parsed") - except RobotError as ex: - self.__logger.error("Run suites before publishing: %s", ex.message) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot parse results") - return self.EX_RUN_ERROR - return self.EX_OK diff --git a/functest/core/singlevm.py b/functest/core/singlevm.py new file mode 100644 index 000000000..4bce516d3 --- /dev/null +++ b/functest/core/singlevm.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Ease deploying a single VM reachable via ssh + +It offers a simple way to create all tenant network resources + a VM for +advanced testcases (e.g. deploying an orchestrator). +""" + +import logging +import re +import tempfile +import time + +import paramiko +from xtesting.core import testcase + +from functest.core import tenantnetwork +from functest.utils import config +from functest.utils import env +from functest.utils import functest_utils + + +class VmReady1(tenantnetwork.TenantNetwork1): + """Prepare a single VM (scenario1) + + It inherits from TenantNetwork1 which creates all network resources and + prepares a future VM attached to that network. + + It ensures that all testcases inheriting from SingleVm1 could work + without specific configurations (or at least read the same config data). + """ + # pylint: disable=too-many-instance-attributes + + __logger = logging.getLogger(__name__) + filename = '/home/opnfv/functest/images/cirros-0.6.1-x86_64-disk.img' + image_format = 'qcow2' + extra_properties = {} + filename_alt = filename + image_alt_format = image_format + extra_alt_properties = extra_properties + visibility = 'private' + flavor_ram = 512 + flavor_vcpus = 1 + flavor_disk = 1 + flavor_extra_specs = {} + flavor_alt_ram = 1024 + flavor_alt_vcpus = 1 + flavor_alt_disk = 1 + flavor_alt_extra_specs = flavor_extra_specs + create_server_timeout = 180 + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'vmready1' + super().__init__(**kwargs) + self.image = None + self.flavor = None + + def publish_image(self, name=None): + """Publish image + + It allows publishing multiple images for the child testcases. It forces + the same configuration for all subtestcases. + + Returns: image + + Raises: expection on error + """ + assert self.cloud + extra_properties = self.extra_properties.copy() + if env.get('IMAGE_PROPERTIES'): + extra_properties.update( + functest_utils.convert_ini_to_dict( + env.get('IMAGE_PROPERTIES'))) + extra_properties.update( + getattr(config.CONF, f'{self.case_name}_extra_properties', {})) + image = self.cloud.create_image( + name if name else f'{self.case_name}-img_{self.guid}', + filename=getattr( + config.CONF, f'{self.case_name}_image', + self.filename), + meta=extra_properties, + disk_format=getattr( + config.CONF, f'{self.case_name}_image_format', + self.image_format), + visibility=getattr( + config.CONF, f'{self.case_name}_visibility', + self.visibility), + wait=True) + self.__logger.debug("image: %s", image) + return image + + def publish_image_alt(self, name=None): + """Publish alternative image + + It allows publishing multiple images for the child testcases. It forces + the same configuration for all subtestcases. + + Returns: image + + Raises: expection on error + """ + assert self.cloud + extra_alt_properties = self.extra_alt_properties.copy() + if env.get('IMAGE_PROPERTIES'): + extra_alt_properties.update( + functest_utils.convert_ini_to_dict( + env.get('IMAGE_PROPERTIES'))) + extra_alt_properties.update( + getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {})) + image = self.cloud.create_image( + name if name else f'{self.case_name}-img_alt_{self.guid}', + filename=getattr( + config.CONF, f'{self.case_name}_image_alt', + self.filename_alt), + meta=extra_alt_properties, + disk_format=getattr( + config.CONF, f'{self.case_name}_image_alt_format', + self.image_format), + visibility=getattr( + config.CONF, f'{self.case_name}_visibility', + self.visibility), + wait=True) + self.__logger.debug("image: %s", image) + return image + + def create_flavor(self, name=None): + """Create flavor + + It allows creating multiple flavors for the child testcases. It forces + the same configuration for all subtestcases. + + Returns: flavor + + Raises: expection on error + """ + assert self.orig_cloud + flavor = self.orig_cloud.create_flavor( + name if name else f'{self.case_name}-flavor_{self.guid}', + getattr(config.CONF, f'{self.case_name}_flavor_ram', + self.flavor_ram), + getattr(config.CONF, f'{self.case_name}_flavor_vcpus', + self.flavor_vcpus), + getattr(config.CONF, f'{self.case_name}_flavor_disk', + self.flavor_disk)) + self.__logger.debug("flavor: %s", flavor) + flavor_extra_specs = self.flavor_extra_specs.copy() + if env.get('FLAVOR_EXTRA_SPECS'): + flavor_extra_specs.update( + functest_utils.convert_ini_to_dict( + env.get('FLAVOR_EXTRA_SPECS'))) + flavor_extra_specs.update( + getattr(config.CONF, + f'{self.case_name}_flavor_extra_specs', {})) + self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs) + return flavor + + def create_flavor_alt(self, name=None): + """Create flavor + + It allows creating multiple alt flavors for the child testcases. It + forces the same configuration for all subtestcases. + + Returns: flavor + + Raises: expection on error + """ + assert self.orig_cloud + flavor = self.orig_cloud.create_flavor( + name if name else f'{self.case_name}-flavor_alt_{self.guid}', + getattr(config.CONF, f'{self.case_name}_flavor_alt_ram', + self.flavor_alt_ram), + getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus', + self.flavor_alt_vcpus), + getattr(config.CONF, f'{self.case_name}_flavor_alt_disk', + self.flavor_alt_disk)) + self.__logger.debug("flavor: %s", flavor) + flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy() + if env.get('FLAVOR_EXTRA_SPECS'): + flavor_alt_extra_specs.update( + functest_utils.convert_ini_to_dict( + env.get('FLAVOR_EXTRA_SPECS'))) + flavor_alt_extra_specs.update( + getattr(config.CONF, + f'{self.case_name}_flavor_alt_extra_specs', {})) + self.orig_cloud.set_flavor_specs( + flavor.id, flavor_alt_extra_specs) + return flavor + + def boot_vm(self, name=None, **kwargs): + """Boot the virtual machine + + It allows booting multiple machines for the child testcases. It forces + the same configuration for all subtestcases. + + Returns: vm + + Raises: expection on error + """ + assert self.cloud + vm1 = self.cloud.create_server( + name if name else f'{self.case_name}-vm_{self.guid}', + image=self.image.id, flavor=self.flavor.id, + auto_ip=False, + network=self.network.id if self.network else env.get( + "EXTERNAL_NETWORK"), + timeout=self.create_server_timeout, wait=True, **kwargs) + self.__logger.debug("vm: %s", vm1) + return vm1 + + def check_regex_in_console(self, name, regex=' login: ', loop=6): + """Wait for specific message in console + + Returns: True or False on errors + """ + assert self.cloud + for iloop in range(loop): + console = self.cloud.get_server_console(name) + self.__logger.debug("console: \n%s", console) + if re.search(regex, console): + self.__logger.debug( + "regex found: '%s' in console\n%s", regex, console) + return True + self.__logger.debug( + "try %s: cannot find regex '%s' in console\n%s", + iloop + 1, regex, console) + time.sleep(10) + self.__logger.error("cannot find regex '%s' in console", regex) + return False + + def clean_orphan_security_groups(self): + """Clean all security groups which are not owned by an existing tenant + + It lists all orphan security groups in use as debug to avoid + misunderstanding the testcase results (it could happen if cloud admin + removes accounts without cleaning the virtual machines) + """ + sec_groups = self.orig_cloud.list_security_groups() + for sec_group in sec_groups: + if not sec_group.tenant_id: + continue + if not self.orig_cloud.get_project(sec_group.tenant_id): + self.__logger.debug("Cleaning security group %s", sec_group.id) + try: + self.orig_cloud.delete_security_group(sec_group.id) + except Exception: # pylint: disable=broad-except + self.__logger.debug( + "Orphan security group %s in use", sec_group.id) + + def count_hypervisors(self): + """Count hypervisors.""" + if env.get('SKIP_DOWN_HYPERVISORS').lower() == 'false': + return len(self.orig_cloud.list_hypervisors()) + return self.count_active_hypervisors() + + def count_active_hypervisors(self): + """Count all hypervisors which are up.""" + compute_cnt = 0 + for hypervisor in self.orig_cloud.list_hypervisors(): + if hypervisor['state'] == 'up': + compute_cnt += 1 + else: + self.__logger.warning( + "%s is down", hypervisor['hypervisor_hostname']) + return compute_cnt + + def run(self, **kwargs): + """Boot the new VM + + Here are the main actions: + - publish the image + - create the flavor + + Returns: + - TestCase.EX_OK + - TestCase.EX_RUN_ERROR on error + """ + status = testcase.TestCase.EX_RUN_ERROR + try: + assert self.cloud + assert super().run( + **kwargs) == testcase.TestCase.EX_OK + self.image = self.publish_image() + self.flavor = self.create_flavor() + self.result = 100 + status = testcase.TestCase.EX_OK + except Exception: # pylint: disable=broad-except + self.__logger.exception('Cannot run %s', self.case_name) + self.result = 0 + finally: + self.stop_time = time.time() + return status + + def clean(self): + try: + assert self.orig_cloud + assert self.cloud + super().clean() + if self.image: + self.cloud.delete_image(self.image.id) + if self.flavor: + self.orig_cloud.delete_flavor(self.flavor.id) + if env.get('CLEAN_ORPHAN_SECURITY_GROUPS').lower() == 'true': + self.clean_orphan_security_groups() + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") + + +class VmReady2(VmReady1): + """Deploy a single VM reachable via ssh (scenario2) + + It creates new user/project before creating and configuring all tenant + network resources, flavors, images, etc. required by advanced testcases. + + It ensures that all testcases inheriting from SingleVm2 could work + without specific configurations (or at least read the same config data). + """ + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'vmready2' + super().__init__(**kwargs) + try: + assert self.orig_cloud + self.project = tenantnetwork.NewProject( + self.orig_cloud, self.case_name, self.guid) + self.project.create() + self.cloud = self.project.cloud + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot create user or project") + self.cloud = None + self.project = None + + def clean(self): + try: + super().clean() + assert self.project + self.project.clean() + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") + + +class SingleVm1(VmReady1): + """Deploy a single VM reachable via ssh (scenario1) + + It inherits from TenantNetwork1 which creates all network resources and + completes it by booting a VM attached to that network. + + It ensures that all testcases inheriting from SingleVm1 could work + without specific configurations (or at least read the same config data). + """ + # pylint: disable=too-many-instance-attributes + + __logger = logging.getLogger(__name__) + username = 'cirros' + ssh_connect_timeout = 1 + ssh_connect_loops = 6 + create_floating_ip_timeout = 120 + check_console_loop = 6 + check_console_regex = ' login: ' + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'singlevm1' + super().__init__(**kwargs) + self.sshvm = None + self.sec = None + self.fip = None + self.keypair = None + self.ssh = None + (_, self.key_filename) = tempfile.mkstemp() + + def prepare(self): + """Create the security group and the keypair + + It can be overriden to set other rules according to the services + running in the VM + + Raises: Exception on error + """ + assert self.cloud + self.keypair = self.cloud.create_keypair( + f'{self.case_name}-kp_{self.guid}') + self.__logger.debug("keypair: %s", self.keypair) + self.__logger.debug("private_key:\n%s", self.keypair.private_key) + with open( + self.key_filename, 'w', encoding='utf-8') as private_key_file: + private_key_file.write(self.keypair.private_key) + self.sec = self.cloud.create_security_group( + f'{self.case_name}-sg_{self.guid}', + f'created by OPNFV Functest ({self.case_name})') + self.cloud.create_security_group_rule( + self.sec.id, port_range_min='22', port_range_max='22', + protocol='tcp', direction='ingress') + self.cloud.create_security_group_rule( + self.sec.id, protocol='icmp', direction='ingress') + + def connect(self, vm1): + """Connect to a virtual machine via ssh + + It first adds a floating ip to the virtual machine and then establishes + the ssh connection. + + Returns: + - (fip, ssh) + - None on error + """ + assert vm1 + fip = None + if env.get('NO_TENANT_NETWORK').lower() != 'true': + fip = self.cloud.create_floating_ip( + network=self.ext_net.id, server=vm1, wait=True, + timeout=self.create_floating_ip_timeout) + self.__logger.debug("floating_ip: %s", fip) + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy()) + for loop in range(self.ssh_connect_loops): + try: + p_console = self.cloud.get_server_console(vm1) + self.__logger.debug("vm console: \n%s", p_console) + ssh.connect( + fip.floating_ip_address if fip else vm1.public_v4, + username=getattr( + config.CONF, + f'{self.case_name}_image_user', self.username), + key_filename=self.key_filename, + timeout=getattr( + config.CONF, + f'{self.case_name}_vm_ssh_connect_timeout', + self.ssh_connect_timeout)) + break + except Exception as exc: # pylint: disable=broad-except + self.__logger.debug( + "try %s: cannot connect to %s: %s", loop + 1, + fip.floating_ip_address if fip else vm1.public_v4, exc) + time.sleep(9) + else: + self.__logger.error( + "cannot connect to %s", fip.floating_ip_address) + return None + return (fip, ssh) + + def execute(self): + """Say hello world via ssh + + It can be overriden to execute any command. + + Returns: echo exit codes + """ + (_, stdout, stderr) = self.ssh.exec_command('echo Hello World') + self.__logger.debug("output:\n%s", stdout.read().decode("utf-8")) + self.__logger.debug("error:\n%s", stderr.read().decode("utf-8")) + return stdout.channel.recv_exit_status() + + def run(self, **kwargs): + """Boot the new VM + + Here are the main actions: + - add a new ssh key + - boot the VM + - create the security group + - execute the right command over ssh + + Returns: + - TestCase.EX_OK + - TestCase.EX_RUN_ERROR on error + """ + status = testcase.TestCase.EX_RUN_ERROR + try: + assert self.cloud + assert super().run( + **kwargs) == testcase.TestCase.EX_OK + self.result = 0 + self.prepare() + self.sshvm = self.boot_vm( + key_name=self.keypair.id, security_groups=[self.sec.id]) + if self.check_regex_in_console( + self.sshvm.name, regex=self.check_console_regex, + loop=self.check_console_loop): + (self.fip, self.ssh) = self.connect(self.sshvm) + if not self.execute(): + self.result = 100 + status = testcase.TestCase.EX_OK + except Exception: # pylint: disable=broad-except + self.__logger.exception('Cannot run %s', self.case_name) + finally: + self.stop_time = time.time() + return status + + def clean(self): + try: + assert self.orig_cloud + assert self.cloud + if self.fip: + self.cloud.delete_floating_ip(self.fip.id) + if self.sshvm: + self.cloud.delete_server(self.sshvm, wait=True) + if self.sec: + self.cloud.delete_security_group(self.sec.id) + if self.keypair: + self.cloud.delete_keypair(self.keypair.name) + super().clean() + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") + + +class SingleVm2(SingleVm1): + """Deploy a single VM reachable via ssh (scenario2) + + It creates new user/project before creating and configuring all tenant + network resources and vms required by advanced testcases. + + It ensures that all testcases inheriting from SingleVm2 could work + without specific configurations (or at least read the same config data). + """ + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'singlevm2' + super().__init__(**kwargs) + try: + assert self.orig_cloud + self.project = tenantnetwork.NewProject( + self.orig_cloud, self.case_name, self.guid) + self.project.create() + self.cloud = self.project.cloud + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot create user or project") + self.cloud = None + self.project = None + + def clean(self): + try: + super().clean() + assert self.project + self.project.clean() + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") diff --git a/functest/core/tenantnetwork.py b/functest/core/tenantnetwork.py new file mode 100644 index 000000000..3670dbe8a --- /dev/null +++ b/functest/core/tenantnetwork.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python + +# Copyright (c) 2018 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Ease deploying tenant networks + +It offers a simple way to create all tenant network resources required by a +testcase (including all Functest ones): + + - TenantNetwork1 selects the user and the project set as env vars + - TenantNetwork2 creates a user and project to isolate the same resources + +This classes could be reused by more complexed scenarios (Single VM) +""" + +import logging +import os +import time +import uuid + +import os_client_config +import shade +from tempest.lib.common.utils import data_utils +from xtesting.core import testcase + +from functest.utils import config +from functest.utils import env +from functest.utils import functest_utils + + +class NewProject(): + """Ease creating new projects/users""" + # pylint: disable=too-many-instance-attributes + + __logger = logging.getLogger(__name__) + + def __init__(self, cloud, case_name, guid): + self.cloud = None + self.orig_cloud = cloud + self.case_name = case_name + self.guid = guid + self.project = None + self.user = None + self.password = None + self.domain = None + self.role_name = None + self.default_member = env.get('NEW_USER_ROLE') + + def create(self): + """Create projects/users""" + assert self.orig_cloud + assert self.case_name + self.password = data_utils.rand_password().replace('%', '!') + self.__logger.debug("password: %s", self.password) + self.domain = self.orig_cloud.get_domain( + name_or_id=self.orig_cloud.auth.get( + "project_domain_name", "Default")) + self.project = self.orig_cloud.create_project( + name=f'{self.case_name[:18]}-project_{self.guid}', + description=f"Created by OPNFV Functest: {self.case_name}", + domain_id=self.domain.id) + self.__logger.debug("project: %s", self.project) + self.user = self.orig_cloud.create_user( + name=f'{self.case_name}-user_{self.guid}', + password=self.password, + domain_id=self.domain.id) + self.__logger.debug("user: %s", self.user) + try: + if self.orig_cloud.get_role(self.default_member): + self.role_name = self.default_member + elif self.orig_cloud.get_role(self.default_member.lower()): + self.role_name = self.default_member.lower() + else: + raise Exception(f"Cannot detect {self.default_member}") + except Exception: # pylint: disable=broad-except + self.__logger.info("Creating default role %s", self.default_member) + role = self.orig_cloud.create_role(self.default_member) + self.role_name = role.name + self.__logger.debug("role: %s", role) + self.orig_cloud.grant_role( + self.role_name, user=self.user.id, project=self.project.id, + domain=self.domain.id) + osconfig = os_client_config.config.OpenStackConfig() + osconfig.cloud_config[ + 'clouds']['envvars']['project_name'] = self.project.name + osconfig.cloud_config[ + 'clouds']['envvars']['project_id'] = self.project.id + osconfig.cloud_config['clouds']['envvars']['username'] = self.user.name + osconfig.cloud_config['clouds']['envvars']['password'] = self.password + self.__logger.debug("cloud_config %s", osconfig.cloud_config) + self.cloud = shade.OpenStackCloud( + cloud_config=osconfig.get_one_cloud()) + self.__logger.debug("new cloud %s", self.cloud.auth) + + def get_environ(self): + "Get new environ" + environ = dict( + os.environ, + OS_USERNAME=self.user.name, + OS_PROJECT_NAME=self.project.name, + OS_PROJECT_ID=self.project.id, + OS_PASSWORD=self.password) + try: + del environ['OS_TENANT_NAME'] + del environ['OS_TENANT_ID'] + except Exception: # pylint: disable=broad-except + pass + return environ + + def clean(self): + """Remove projects/users""" + try: + assert self.orig_cloud + if self.user: + self.orig_cloud.delete_user(self.user.id) + if self.project: + self.orig_cloud.delete_project(self.project.id) + secgroups = self.orig_cloud.list_security_groups( + filters={'name': 'default', + 'project_id': self.project.id}) + if secgroups: + sec_id = secgroups[0].id + self.orig_cloud.delete_security_group(sec_id) + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") + + +class TenantNetwork1(testcase.TestCase): + # pylint: disable=too-many-instance-attributes + """Create a tenant network (scenario1) + + It creates and configures all tenant network resources required by + advanced testcases (subnet, network and router). + + It ensures that all testcases inheriting from TenantNetwork1 could work + without network specific configurations (or at least read the same config + data). + """ + + __logger = logging.getLogger(__name__) + cidr = '192.168.120.0/24' + shared_network = False + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'tenantnetwork1' + super().__init__(**kwargs) + self.dir_results = os.path.join(getattr(config.CONF, 'dir_results')) + self.res_dir = os.path.join(self.dir_results, self.case_name) + self.output_log_name = 'functest.log' + self.output_debug_log_name = 'functest.debug.log' + self.ext_net = None + try: + cloud_config = os_client_config.get_config() + self.cloud = self.orig_cloud = shade.OpenStackCloud( + cloud_config=cloud_config) + except Exception: # pylint: disable=broad-except + self.cloud = self.orig_cloud = None + self.__logger.exception("Cannot connect to Cloud") + if env.get('NO_TENANT_NETWORK').lower() != 'true': + try: + self.ext_net = self.get_external_network(self.cloud) + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot get the external network") + self.guid = str(uuid.uuid4()) + self.network = None + self.subnet = None + self.router = None + + @staticmethod + def get_external_network(cloud): + """ + Return the configured external network name or + the first retrieved external network name + """ + assert cloud + if env.get("EXTERNAL_NETWORK"): + network = cloud.get_network( + env.get("EXTERNAL_NETWORK"), {"router:external": True}) + if network: + return network + networks = cloud.list_networks({"router:external": True}) + if networks: + return networks[0] + return None + + @staticmethod + def get_default_role(cloud, member="Member"): + """Get the default role + + It also tests the role in lowercase to avoid possible conflicts. + """ + role = cloud.get_role(member) + if not role: + role = cloud.get_role(member.lower()) + return role + + @staticmethod + def get_public_auth_url(cloud): + """Get Keystone public endpoint""" + keystone_id = functest_utils.search_services(cloud, 'keystone')[0].id + endpoint = cloud.search_endpoints( + filters={'interface': 'public', + 'service_id': keystone_id})[0].url + return endpoint + + def create_network_resources(self): + """Create all tenant network resources + + It creates a router which gateway is the external network detected. + The new subnet is attached to that router. + + Raises: expection on error + """ + assert self.cloud + if env.get('NO_TENANT_NETWORK').lower() != 'true': + assert self.ext_net + provider = {} + if hasattr(config.CONF, f'{self.case_name}_network_type'): + provider["network_type"] = getattr( + config.CONF, f'{self.case_name}_network_type') + if hasattr(config.CONF, f'{self.case_name}_physical_network'): + provider["physical_network"] = getattr( + config.CONF, f'{self.case_name}_physical_network') + if hasattr(config.CONF, f'{self.case_name}_segmentation_id'): + provider["segmentation_id"] = getattr( + config.CONF, f'{self.case_name}_segmentation_id') + domain = self.orig_cloud.get_domain( + name_or_id=self.orig_cloud.auth.get( + "project_domain_name", "Default")) + project = self.orig_cloud.get_project( + self.cloud.auth['project_name'], + domain_id=domain.id) + self.network = self.orig_cloud.create_network( + f'{self.case_name}-net_{self.guid}', + provider=provider, project_id=project.id, + shared=self.shared_network) + self.__logger.debug("network: %s", self.network) + + self.subnet = self.cloud.create_subnet( + self.network.id, + subnet_name=f'{self.case_name}-subnet_{self.guid}', + cidr=getattr( + config.CONF, f'{self.case_name}_private_subnet_cidr', + self.cidr), + enable_dhcp=True, + dns_nameservers=[env.get('NAMESERVER')]) + self.__logger.debug("subnet: %s", self.subnet) + + self.router = self.cloud.create_router( + name=f'{self.case_name}-router_{self.guid}', + ext_gateway_net_id=self.ext_net.id if self.ext_net else None) + self.__logger.debug("router: %s", self.router) + self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id) + + def run(self, **kwargs): + status = testcase.TestCase.EX_RUN_ERROR + try: + assert self.cloud + self.start_time = time.time() + if env.get('NO_TENANT_NETWORK').lower() != 'true': + self.create_network_resources() + self.result = 100 + status = testcase.TestCase.EX_OK + except Exception: # pylint: disable=broad-except + self.__logger.exception('Cannot run %s', self.case_name) + finally: + self.stop_time = time.time() + return status + + def clean(self): + try: + assert self.cloud + if self.router: + if self.subnet: + self.cloud.remove_router_interface( + self.router, self.subnet.id) + self.cloud.delete_router(self.router.id) + if self.subnet: + self.cloud.delete_subnet(self.subnet.id) + if self.network: + self.cloud.delete_network(self.network.id) + except Exception: # pylint: disable=broad-except + self.__logger.exception("cannot clean all resources") + + +class TenantNetwork2(TenantNetwork1): + """Create a tenant network (scenario2) + + It creates new user/project before creating and configuring all tenant + network resources required by a testcase (subnet, network and router). + + It ensures that all testcases inheriting from TenantNetwork2 could work + without network specific configurations (or at least read the same config + data). + """ + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + if "case_name" not in kwargs: + kwargs["case_name"] = 'tenantnetwork2' + super().__init__(**kwargs) + try: + assert self.cloud + self.project = NewProject( + self.cloud, self.case_name, self.guid) + self.project.create() + self.cloud = self.project.cloud + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot create user or project") + self.cloud = None + self.project = None + + def clean(self): + try: + super().clean() + assert self.project + self.project.clean() + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot clean all resources") diff --git a/functest/core/testcase.py b/functest/core/testcase.py deleted file mode 100644 index fa3802872..000000000 --- a/functest/core/testcase.py +++ /dev/null @@ -1,186 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all Functest TestCases.""" - -import logging -import os - -import functest.utils.functest_utils as ft_utils - -import prettytable - - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class TestCase(object): - """Base model for single test case.""" - - EX_OK = os.EX_OK - """everything is OK""" - - EX_RUN_ERROR = os.EX_SOFTWARE - """run() failed""" - - EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1 - """push_to_db() failed""" - - EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2 - """results are false""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - self.details = {} - self.project_name = kwargs.get('project_name', 'functest') - self.case_name = kwargs.get('case_name', '') - self.criteria = kwargs.get('criteria', 100) - self.result = 0 - self.start_time = 0 - self.stop_time = 0 - - def __str__(self): - try: - assert self.project_name - assert self.case_name - result = 'PASS' if(self.is_successful( - ) == TestCase.EX_OK) else 'FAIL' - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'project', 'duration', - 'result']) - msg.add_row([self.case_name, self.project_name, - self.get_duration(), result]) - return msg.get_string() - except AssertionError: - self.__logger.error("We cannot print invalid objects") - return super(TestCase, self).__str__() - - def get_duration(self): - """Return the duration of the test case. - - Returns: - duration if start_time and stop_time are set - "XX:XX" otherwise. - """ - try: - assert self.start_time - assert self.stop_time - if self.stop_time < self.start_time: - return "XX:XX" - return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod( - self.stop_time - self.start_time, 60)) - except Exception: # pylint: disable=broad-except - self.__logger.error("Please run test before getting the duration") - return "XX:XX" - - def is_successful(self): - """Interpret the result of the test case. - - It allows getting the result of TestCase. It completes run() - which only returns the execution status. - - It can be overriden if checking result is not suitable. - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - try: - assert self.criteria - assert self.result is not None - if (not isinstance(self.result, str) and - not isinstance(self.criteria, str)): - if self.result >= self.criteria: - return TestCase.EX_OK - else: - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - # In this case criteria is unread. - self.__logger.warning( - "Please update result which must be an int!") - if self.result == 'PASS': - return TestCase.EX_OK - except AssertionError: - self.__logger.error("Please run test before checking the results") - return TestCase.EX_TESTCASE_FAILED - - def run(self, **kwargs): - """Run the test case. - - It allows running TestCase and getting its execution - status. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must set the following attributes to - push the results to DB: - - * result, - * start_time, - * stop_time. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_RUN_ERROR. - """ - # pylint: disable=unused-argument - self.__logger.error("Run must be implemented") - return TestCase.EX_RUN_ERROR - - def push_to_db(self): - """Push the results of the test case to the DB. - - It allows publishing the results and to check the status. - - It could be overriden if the common implementation is not - suitable. The following attributes must be set before pushing - the results to DB: - - * project_name, - * case_name, - * result, - * start_time, - * stop_time. - - Returns: - TestCase.EX_OK if results were pushed to DB. - TestCase.EX_PUSH_TO_DB_ERROR otherwise. - """ - try: - assert self.project_name - assert self.case_name - assert self.start_time - assert self.stop_time - pub_result = 'PASS' if self.is_successful( - ) == TestCase.EX_OK else 'FAIL' - if ft_utils.push_results_to_db( - self.project_name, self.case_name, self.start_time, - self.stop_time, pub_result, self.details): - self.__logger.info( - "The results were successfully pushed to DB") - return TestCase.EX_OK - else: - self.__logger.error("The results cannot be pushed to DB") - return TestCase.EX_PUSH_TO_DB_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("The results cannot be pushed to DB") - return TestCase.EX_PUSH_TO_DB_ERROR - - def clean(self): - """Clean the resources. - - It can be overriden if resources must be deleted after - running the test case. - """ diff --git a/functest/core/unit.py b/functest/core/unit.py deleted file mode 100644 index 61b5a58d9..000000000 --- a/functest/core/unit.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cable Television Laboratories, Inc. and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class to run unittest.TestSuite as TestCase.""" - -from __future__ import division - -import logging -import time -import unittest - -import six - -from functest.core import testcase - -__author__ = ("Steven Pisarski <s.pisarski@cablelabs.com>, " - "Cedric Ollivier <cedric.ollivier@orange.com>") - - -class Suite(testcase.TestCase): - """Base model for running unittest.TestSuite.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(Suite, self).__init__(**kwargs) - self.suite = None - - def run(self, **kwargs): - """Run the test suite. - - It allows running any unittest.TestSuite and getting its - execution status. - - By default, it runs the suite defined as instance attribute. - It can be overriden by passing name as arg. It must - conform with TestLoader.loadTestsFromName(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time, - * details. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if any TestSuite has been run, - TestCase.EX_RUN_ERROR otherwise. - """ - try: - name = kwargs["name"] - try: - self.suite = unittest.TestLoader().loadTestsFromName(name) - except ImportError: - self.__logger.error("Can not import %s", name) - return testcase.TestCase.EX_RUN_ERROR - except KeyError: - pass - try: - assert self.suite - self.start_time = time.time() - stream = six.StringIO() - result = unittest.TextTestRunner( - stream=stream, verbosity=2).run(self.suite) - self.__logger.debug("\n\n%s", stream.getvalue()) - self.stop_time = time.time() - self.details = { - "testsRun": result.testsRun, - "failures": len(result.failures), - "errors": len(result.errors), - "stream": stream.getvalue()} - self.result = 100 * ( - (result.testsRun - (len(result.failures) + - len(result.errors))) / - result.testsRun) - return testcase.TestCase.EX_OK - except AssertionError: - self.__logger.error("No suite is defined") - return testcase.TestCase.EX_RUN_ERROR - except ZeroDivisionError: - self.__logger.error("No test has been run") - return testcase.TestCase.EX_RUN_ERROR diff --git a/functest/core/vnf.py b/functest/core/vnf.py deleted file mode 100644 index 73aaf446e..000000000 --- a/functest/core/vnf.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all VNF TestCases.""" - -import logging -import time - -import functest.core.testcase as base -from functest.utils.constants import CONST -from snaps.config.user import UserConfig -from snaps.config.project import ProjectConfig -from snaps.openstack.create_user import OpenStackUser -from snaps.openstack.create_project import OpenStackProject -from snaps.openstack.tests import openstack_tests - -__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, " - "Valentin Boucher <valentin.boucher@orange.com>") - - -class VnfPreparationException(Exception): - """Raise when VNF preparation cannot be executed.""" - - -class OrchestratorDeploymentException(Exception): - """Raise when orchestrator cannot be deployed.""" - - -class VnfDeploymentException(Exception): - """Raise when VNF cannot be deployed.""" - - -class VnfTestException(Exception): - """Raise when VNF cannot be tested.""" - - -class VnfOnBoarding(base.TestCase): - """Base model for VNF test cases.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(VnfOnBoarding, self).__init__(**kwargs) - self.tenant_name = CONST.__getattribute__( - 'vnf_{}_tenant_name'.format(self.case_name)) - self.snaps_creds = {} - self.created_object = [] - self.os_project = None - - def run(self, **kwargs): - """ - Run of the VNF test case: - - * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...), - * Deploy the VNF, - * Perform tests on the VNF - - A VNF test case is successfull when the 3 steps are PASS - If one of the step is FAIL, the test case is FAIL - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - self.start_time = time.time() - - try: - self.prepare() - if (self.deploy_orchestrator() and - self.deploy_vnf() and - self.test_vnf()): - self.stop_time = time.time() - # Calculation with different weight depending on the steps TODO - self.result = 100 - return base.TestCase.EX_OK - else: - self.result = 0 - self.stop_time = time.time() - return base.TestCase.EX_TESTCASE_FAILED - except Exception: # pylint: disable=broad-except - self.stop_time = time.time() - self.__logger.exception("Exception on VNF testing") - return base.TestCase.EX_TESTCASE_FAILED - - def prepare(self): - """ - Prepare the environment for VNF testing: - - * Creation of a user, - * Creation of a tenant, - * Allocation admin role to the user on this tenant - - Returns base.TestCase.EX_OK if preparation is successfull - - Raise VnfPreparationException in case of problem - """ - try: - tenant_description = CONST.__getattribute__( - 'vnf_{}_tenant_description'.format(self.case_name)) - self.__logger.info("Prepare VNF: %s, description: %s", - self.tenant_name, tenant_description) - snaps_creds = openstack_tests.get_credentials( - os_env_file=CONST.__getattribute__('openstack_creds')) - - project_creator = OpenStackProject( - snaps_creds, - ProjectConfig( - name=self.tenant_name, - description=tenant_description - )) - project_creator.create() - self.created_object.append(project_creator) - self.os_project = project_creator - - user_creator = OpenStackUser( - snaps_creds, - UserConfig( - name=self.tenant_name, - password=self.tenant_name, - roles={'admin': self.tenant_name})) - - user_creator.create() - self.created_object.append(user_creator) - - self.snaps_creds = user_creator.get_os_creds(self.tenant_name) - - return base.TestCase.EX_OK - except Exception: # pylint: disable=broad-except - self.__logger.exception("Exception raised during VNF preparation") - raise VnfPreparationException - - def deploy_orchestrator(self): - """ - Deploy an orchestrator (optional). - - If this method is overriden then raise orchestratorDeploymentException - if error during orchestrator deployment - """ - self.__logger.info("Deploy orchestrator (if necessary)") - return True - - def deploy_vnf(self): - """ - Deploy the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - The deployment can be executed via a specific orchestrator - or using build-in orchestrators such as heat, OpenBaton, cloudify, - juju, onap, ... - - Returns: - True if the VNF is properly deployed - False if the VNF is not deployed - - Raise VnfDeploymentException if error during VNF deployment - """ - self.__logger.error("VNF must be deployed") - raise VnfDeploymentException - - def test_vnf(self): - """ - Test the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - Once a VNF is deployed, it is assumed that specific test suite can be - run to validate the VNF. - Please note that the same test suite can be used on several test case - (e.g. clearwater test suite can be used whatever the orchestrator used - for the deployment) - - Returns: - True if VNF tests are PASS - False if test suite is FAIL - - Raise VnfTestException if error during VNF test - """ - self.__logger.error("VNF must be tested") - raise VnfTestException - - def clean(self): - """ - Clean VNF test case. - - It is up to the test providers to delete resources used for the tests. - By default we clean: - - * the user, - * the tenant - """ - self.__logger.info("test cleaning") - self.__logger.info('Remove the cloudify manager OS object ..') - for creator in reversed(self.created_object): - try: - creator.clean() - except Exception as exc: # pylint: disable=broad-except - self.__logger.error('Unexpected error cleaning - %s', exc) |