aboutsummaryrefslogtreecommitdiffstats
path: root/functest/core
diff options
context:
space:
mode:
Diffstat (limited to 'functest/core')
-rw-r--r--functest/core/cloudify.py219
-rw-r--r--functest/core/feature.py134
-rw-r--r--functest/core/robotframework.py125
-rw-r--r--functest/core/singlevm.py549
-rw-r--r--functest/core/tenantnetwork.py326
-rw-r--r--functest/core/testcase.py186
-rw-r--r--functest/core/unit.py92
-rw-r--r--functest/core/vnf.py206
8 files changed, 1094 insertions, 743 deletions
diff --git a/functest/core/cloudify.py b/functest/core/cloudify.py
new file mode 100644
index 000000000..966d33645
--- /dev/null
+++ b/functest/core/cloudify.py
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Cloudify testcase implementation."""
+
+from __future__ import division
+
+import logging
+import os
+import time
+import traceback
+
+from cloudify_rest_client import CloudifyClient
+from cloudify_rest_client.executions import Execution
+import scp
+
+from functest.core import singlevm
+
+
+class Cloudify(singlevm.SingleVm2):
+ """Cloudify Orchestrator Case."""
+
+ __logger = logging.getLogger(__name__)
+
+ filename = ('/home/opnfv/functest/images/'
+ 'ubuntu-18.04-server-cloudimg-amd64.img')
+ flavor_ram = 4096
+ flavor_vcpus = 2
+ flavor_disk = 40
+ username = 'ubuntu'
+ ssh_connect_loops = 12
+ create_server_timeout = 600
+ ports = [80, 443, 5671, 53333]
+
+ cloudify_archive = ('/home/opnfv/functest/images/'
+ 'cloudify-docker-manager-community-19.01.24.tar')
+ cloudify_container = "docker-cfy-manager:latest"
+
+ def __init__(self, **kwargs):
+ """Initialize Cloudify testcase object."""
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = "cloudify"
+ super().__init__(**kwargs)
+ self.cfy_client = None
+
+ def prepare(self):
+ super().prepare()
+ for port in self.ports:
+ self.cloud.create_security_group_rule(
+ self.sec.id, port_range_min=port, port_range_max=port,
+ protocol='tcp', direction='ingress')
+
+ def execute(self):
+ """
+ Deploy Cloudify Manager.
+ """
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(self.cloudify_archive,
+ remote_path=os.path.basename(self.cloudify_archive))
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo apt-get update && "
+ "sudo apt-get install -y docker.io && "
+ "sudo docker load -i "
+ f"~/{os.path.basename(self.cloudify_archive)} && "
+ "sudo docker run --name cfy_manager_local -d "
+ "--restart unless-stopped -v /sys/fs/cgroup:/sys/fs/cgroup:ro "
+ "--tmpfs /run --tmpfs /run/lock --security-opt seccomp:unconfined "
+ f"--cap-add SYS_ADMIN --network=host {self.cloudify_container}")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+ self.cfy_client = CloudifyClient(
+ host=self.fip.floating_ip_address if self.fip else (
+ self.sshvm.public_v4),
+ username='admin', password='admin', tenant='default_tenant')
+ self.__logger.info("Attemps running status of the Manager")
+ secret_key = "foo"
+ secret_value = "bar"
+ for loop in range(20):
+ try:
+ self.__logger.debug(
+ "status %s", self.cfy_client.manager.get_status())
+ cfy_status = self.cfy_client.manager.get_status()['status']
+ self.__logger.info(
+ "The current manager status is %s", cfy_status)
+ if str(cfy_status) != 'running':
+ raise Exception("Cloudify Manager isn't up and running")
+ for secret in iter(self.cfy_client.secrets.list()):
+ if secret_key == secret["key"]:
+ self.__logger.debug("Updating secrets: %s", secret_key)
+ self.cfy_client.secrets.update(
+ secret_key, secret_value)
+ break
+ else:
+ self.__logger.debug("Creating secrets: %s", secret_key)
+ self.cfy_client.secrets.create(secret_key, secret_value)
+ self.cfy_client.secrets.delete(secret_key)
+ self.__logger.info("Secrets API successfully reached")
+ break
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "try %s: Cloudify Manager isn't up and running \n%s",
+ loop + 1, traceback.format_exc())
+ time.sleep(30)
+ else:
+ self.__logger.error("Cloudify Manager isn't up and running")
+ return 1
+ self.__logger.info("Cloudify Manager is up and running")
+ return 0
+
+ def put_private_key(self):
+ """Put private keypair in manager"""
+ self.__logger.info("Put private keypair in manager")
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker cp ~/cloudify_ims.pem "
+ "cfy_manager_local:/etc/cloudify/ && "
+ "sudo docker exec cfy_manager_local "
+ "chmod 444 /etc/cloudify/cloudify_ims.pem")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+
+ def upload_cfy_plugins(self, yaml, wgn):
+ """Upload Cloudify plugins"""
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker exec cfy_manager_local "
+ f"cfy plugins upload -y {yaml} {wgn} && "
+ "sudo docker exec cfy_manager_local cfy status")
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+
+ def kill_existing_execution(self, dep_name):
+ """kill existing execution"""
+ try:
+ self.__logger.info('Deleting the current deployment')
+ exec_list = self.cfy_client.executions.list()
+ for execution in exec_list:
+ if execution['status'] == "started":
+ try:
+ self.cfy_client.executions.cancel(
+ execution['id'], force=True)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.warning("Can't cancel the current exec")
+ execution = self.cfy_client.executions.start(
+ dep_name, 'uninstall', parameters=dict(ignore_failure=True))
+ wait_for_execution(self.cfy_client, execution, self.__logger)
+ self.cfy_client.deployments.delete(dep_name)
+ time.sleep(10)
+ self.cfy_client.blueprints.delete(dep_name)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Some issue during the undeployment ..")
+
+
+def wait_for_execution(client, execution, logger, timeout=3600, ):
+ """Wait for a workflow execution on Cloudify Manager."""
+ # if execution already ended - return without waiting
+ if execution.status in Execution.END_STATES:
+ return execution
+
+ if timeout is not None:
+ deadline = time.time() + timeout
+
+ # Poll for execution status and execution logs, until execution ends
+ # and we receive an event of type in WORKFLOW_END_TYPES
+ offset = 0
+ batch_size = 50
+ event_list = []
+ execution_ended = False
+ while True:
+ event_list = client.events.list(
+ execution_id=execution.id,
+ _offset=offset,
+ _size=batch_size,
+ include_logs=True,
+ sort='@timestamp').items
+
+ offset = offset + len(event_list)
+ for event in event_list:
+ logger.debug(event.get('message'))
+
+ if timeout is not None:
+ if time.time() > deadline:
+ raise RuntimeError(
+ 'execution of operation {execution.workflow_id} for '
+ 'deployment {execution.deployment_id} timed out')
+ # update the remaining timeout
+ timeout = deadline - time.time()
+
+ if not execution_ended:
+ execution = client.executions.get(execution.id)
+ execution_ended = execution.status in Execution.END_STATES
+
+ if execution_ended:
+ break
+
+ time.sleep(5)
+
+ return execution
+
+
+def get_execution_id(client, deployment_id):
+ """
+ Get the execution id of a env preparation.
+
+ network, security group, fip, VM creation
+ """
+ executions = client.executions.list(deployment_id=deployment_id)
+ for execution in executions:
+ if execution.workflow_id == 'create_deployment_environment':
+ return execution
+ raise RuntimeError('Failed to get create_deployment_environment '
+ 'workflow execution.'
+ f'Available executions: {executions}')
diff --git a/functest/core/feature.py b/functest/core/feature.py
deleted file mode 100644
index 3200dad85..000000000
--- a/functest/core/feature.py
+++ /dev/null
@@ -1,134 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 ZTE Corp and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent classes of all Functest Features.
-
-Feature is considered as TestCase offered by Third-party. It offers
-helpers to run any python method or any bash command.
-"""
-
-import logging
-import subprocess
-import time
-
-import functest.core.testcase as base
-from functest.utils.constants import CONST
-
-__author__ = ("Serena Feng <feng.xiaowei@zte.com.cn>, "
- "Cedric Ollivier <cedric.ollivier@orange.com>")
-
-
-class Feature(base.TestCase):
- """Base model for single feature."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(Feature, self).__init__(**kwargs)
- self.result_file = "{}/{}.log".format(
- CONST.__getattribute__('dir_results'), self.case_name)
- try:
- module = kwargs['run']['module']
- self.logger = logging.getLogger(module)
- except KeyError:
- self.__logger.warning(
- "Cannot get module name %s. Using %s as fallback",
- kwargs, self.case_name)
- self.logger = logging.getLogger(self.case_name)
- handler = logging.StreamHandler()
- handler.setLevel(logging.WARN)
- self.logger.addHandler(handler)
- handler = logging.FileHandler(self.result_file)
- handler.setLevel(logging.DEBUG)
- self.logger.addHandler(handler)
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
-
- def execute(self, **kwargs):
- """Execute the Python method.
-
- The subclasses must override the default implementation which
- is false on purpose.
-
- The new implementation must return 0 if success or anything
- else if failure.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- -1.
- """
- # pylint: disable=unused-argument,no-self-use
- return -1
-
- def run(self, **kwargs):
- """Run the feature.
-
- It allows executing any Python method by calling execute().
-
- It sets the following attributes required to push the results
- to DB:
-
- * result,
- * start_time,
- * stop_time.
-
- It doesn't fulfill details when pushing the results to the DB.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_OK if execute() returns 0,
- TestCase.EX_RUN_ERROR otherwise.
- """
- self.start_time = time.time()
- exit_code = base.TestCase.EX_RUN_ERROR
- self.result = 0
- try:
- if self.execute(**kwargs) == 0:
- exit_code = base.TestCase.EX_OK
- self.result = 100
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("%s FAILED", self.project_name)
- self.__logger.info("Test result is stored in '%s'", self.result_file)
- self.stop_time = time.time()
- return exit_code
-
-
-class BashFeature(Feature):
- """Class designed to run any bash command."""
-
- __logger = logging.getLogger(__name__)
-
- def execute(self, **kwargs):
- """Execute the cmd passed as arg
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- 0 if cmd returns 0,
- -1 otherwise.
- """
- ret = -1
- try:
- cmd = kwargs["cmd"]
- with open(self.result_file, 'w+') as f_stdout:
- proc = subprocess.Popen(cmd.split(), stdout=f_stdout,
- stderr=subprocess.STDOUT)
- ret = proc.wait()
- if ret != 0:
- self.__logger.error("Execute command: %s failed", cmd)
- except KeyError:
- self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs)
- return ret
diff --git a/functest/core/robotframework.py b/functest/core/robotframework.py
deleted file mode 100644
index 689d9d946..000000000
--- a/functest/core/robotframework.py
+++ /dev/null
@@ -1,125 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define classes required to run any Robot suites."""
-
-from __future__ import division
-
-import errno
-import logging
-import os
-
-import robot.api
-from robot.errors import RobotError
-import robot.run
-from robot.utils.robottime import timestamp_to_secs
-from six import StringIO
-
-from functest.core import testcase
-from functest.utils import constants
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class ResultVisitor(robot.api.ResultVisitor):
- """Visitor to get result details."""
-
- def __init__(self):
- self._data = []
-
- def visit_test(self, test):
- output = {}
- output['name'] = test.name
- output['parent'] = test.parent.name
- output['status'] = test.status
- output['starttime'] = test.starttime
- output['endtime'] = test.endtime
- output['critical'] = test.critical
- output['text'] = test.message
- output['elapsedtime'] = test.elapsedtime
- self._data.append(output)
-
- def get_data(self):
- """Get the details of the result."""
- return self._data
-
-
-class RobotFramework(testcase.TestCase):
- """RobotFramework runner."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- self.res_dir = os.path.join(
- constants.CONST.__getattribute__('dir_results'), 'robot')
- self.xml_file = os.path.join(self.res_dir, 'output.xml')
- super(RobotFramework, self).__init__(**kwargs)
-
- def parse_results(self):
- """Parse output.xml and get the details in it."""
- result = robot.api.ExecutionResult(self.xml_file)
- visitor = ResultVisitor()
- result.visit(visitor)
- try:
- self.result = 100 * (
- result.suite.statistics.critical.passed /
- result.suite.statistics.critical.total)
- except ZeroDivisionError:
- self.__logger.error("No test has been run")
- self.start_time = timestamp_to_secs(result.suite.starttime)
- self.stop_time = timestamp_to_secs(result.suite.endtime)
- self.details = {}
- self.details['description'] = result.suite.name
- self.details['tests'] = visitor.get_data()
-
- def run(self, **kwargs):
- """Run the RobotFramework suites
-
- Here are the steps:
- * create the output directories if required,
- * get the results in output.xml,
- * delete temporary files.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- EX_OK if all suites ran well.
- EX_RUN_ERROR otherwise.
- """
- try:
- suites = kwargs["suites"]
- variable = kwargs.get("variable", [])
- except KeyError:
- self.__logger.exception("Mandatory args were not passed")
- return self.EX_RUN_ERROR
- try:
- os.makedirs(self.res_dir)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- self.__logger.exception("Cannot create %s", self.res_dir)
- return self.EX_RUN_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot create %s", self.res_dir)
- return self.EX_RUN_ERROR
- stream = StringIO()
- robot.run(*suites, variable=variable, output=self.xml_file,
- log='NONE', report='NONE', stdout=stream)
- self.__logger.info("\n" + stream.getvalue())
- self.__logger.info("Results were successfully generated")
- try:
- self.parse_results()
- self.__logger.info("Results were successfully parsed")
- except RobotError as ex:
- self.__logger.error("Run suites before publishing: %s", ex.message)
- return self.EX_RUN_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot parse results")
- return self.EX_RUN_ERROR
- return self.EX_OK
diff --git a/functest/core/singlevm.py b/functest/core/singlevm.py
new file mode 100644
index 000000000..4bce516d3
--- /dev/null
+++ b/functest/core/singlevm.py
@@ -0,0 +1,549 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Ease deploying a single VM reachable via ssh
+
+It offers a simple way to create all tenant network resources + a VM for
+advanced testcases (e.g. deploying an orchestrator).
+"""
+
+import logging
+import re
+import tempfile
+import time
+
+import paramiko
+from xtesting.core import testcase
+
+from functest.core import tenantnetwork
+from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
+
+
+class VmReady1(tenantnetwork.TenantNetwork1):
+ """Prepare a single VM (scenario1)
+
+ It inherits from TenantNetwork1 which creates all network resources and
+ prepares a future VM attached to that network.
+
+ It ensures that all testcases inheriting from SingleVm1 could work
+ without specific configurations (or at least read the same config data).
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+ filename = '/home/opnfv/functest/images/cirros-0.6.1-x86_64-disk.img'
+ image_format = 'qcow2'
+ extra_properties = {}
+ filename_alt = filename
+ image_alt_format = image_format
+ extra_alt_properties = extra_properties
+ visibility = 'private'
+ flavor_ram = 512
+ flavor_vcpus = 1
+ flavor_disk = 1
+ flavor_extra_specs = {}
+ flavor_alt_ram = 1024
+ flavor_alt_vcpus = 1
+ flavor_alt_disk = 1
+ flavor_alt_extra_specs = flavor_extra_specs
+ create_server_timeout = 180
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'vmready1'
+ super().__init__(**kwargs)
+ self.image = None
+ self.flavor = None
+
+ def publish_image(self, name=None):
+ """Publish image
+
+ It allows publishing multiple images for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: image
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ extra_properties = self.extra_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
+ extra_properties.update(
+ getattr(config.CONF, f'{self.case_name}_extra_properties', {}))
+ image = self.cloud.create_image(
+ name if name else f'{self.case_name}-img_{self.guid}',
+ filename=getattr(
+ config.CONF, f'{self.case_name}_image',
+ self.filename),
+ meta=extra_properties,
+ disk_format=getattr(
+ config.CONF, f'{self.case_name}_image_format',
+ self.image_format),
+ visibility=getattr(
+ config.CONF, f'{self.case_name}_visibility',
+ self.visibility),
+ wait=True)
+ self.__logger.debug("image: %s", image)
+ return image
+
+ def publish_image_alt(self, name=None):
+ """Publish alternative image
+
+ It allows publishing multiple images for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: image
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ extra_alt_properties = self.extra_alt_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_alt_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
+ extra_alt_properties.update(
+ getattr(config.CONF, f'{self.case_name}_extra_alt_properties', {}))
+ image = self.cloud.create_image(
+ name if name else f'{self.case_name}-img_alt_{self.guid}',
+ filename=getattr(
+ config.CONF, f'{self.case_name}_image_alt',
+ self.filename_alt),
+ meta=extra_alt_properties,
+ disk_format=getattr(
+ config.CONF, f'{self.case_name}_image_alt_format',
+ self.image_format),
+ visibility=getattr(
+ config.CONF, f'{self.case_name}_visibility',
+ self.visibility),
+ wait=True)
+ self.__logger.debug("image: %s", image)
+ return image
+
+ def create_flavor(self, name=None):
+ """Create flavor
+
+ It allows creating multiple flavors for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: flavor
+
+ Raises: expection on error
+ """
+ assert self.orig_cloud
+ flavor = self.orig_cloud.create_flavor(
+ name if name else f'{self.case_name}-flavor_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_ram',
+ self.flavor_ram),
+ getattr(config.CONF, f'{self.case_name}_flavor_vcpus',
+ self.flavor_vcpus),
+ getattr(config.CONF, f'{self.case_name}_flavor_disk',
+ self.flavor_disk))
+ self.__logger.debug("flavor: %s", flavor)
+ flavor_extra_specs = self.flavor_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
+ flavor_extra_specs.update(
+ getattr(config.CONF,
+ f'{self.case_name}_flavor_extra_specs', {}))
+ self.orig_cloud.set_flavor_specs(flavor.id, flavor_extra_specs)
+ return flavor
+
+ def create_flavor_alt(self, name=None):
+ """Create flavor
+
+ It allows creating multiple alt flavors for the child testcases. It
+ forces the same configuration for all subtestcases.
+
+ Returns: flavor
+
+ Raises: expection on error
+ """
+ assert self.orig_cloud
+ flavor = self.orig_cloud.create_flavor(
+ name if name else f'{self.case_name}-flavor_alt_{self.guid}',
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_ram',
+ self.flavor_alt_ram),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_vcpus',
+ self.flavor_alt_vcpus),
+ getattr(config.CONF, f'{self.case_name}_flavor_alt_disk',
+ self.flavor_alt_disk))
+ self.__logger.debug("flavor: %s", flavor)
+ flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_alt_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
+ flavor_alt_extra_specs.update(
+ getattr(config.CONF,
+ f'{self.case_name}_flavor_alt_extra_specs', {}))
+ self.orig_cloud.set_flavor_specs(
+ flavor.id, flavor_alt_extra_specs)
+ return flavor
+
+ def boot_vm(self, name=None, **kwargs):
+ """Boot the virtual machine
+
+ It allows booting multiple machines for the child testcases. It forces
+ the same configuration for all subtestcases.
+
+ Returns: vm
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ vm1 = self.cloud.create_server(
+ name if name else f'{self.case_name}-vm_{self.guid}',
+ image=self.image.id, flavor=self.flavor.id,
+ auto_ip=False,
+ network=self.network.id if self.network else env.get(
+ "EXTERNAL_NETWORK"),
+ timeout=self.create_server_timeout, wait=True, **kwargs)
+ self.__logger.debug("vm: %s", vm1)
+ return vm1
+
+ def check_regex_in_console(self, name, regex=' login: ', loop=6):
+ """Wait for specific message in console
+
+ Returns: True or False on errors
+ """
+ assert self.cloud
+ for iloop in range(loop):
+ console = self.cloud.get_server_console(name)
+ self.__logger.debug("console: \n%s", console)
+ if re.search(regex, console):
+ self.__logger.debug(
+ "regex found: '%s' in console\n%s", regex, console)
+ return True
+ self.__logger.debug(
+ "try %s: cannot find regex '%s' in console\n%s",
+ iloop + 1, regex, console)
+ time.sleep(10)
+ self.__logger.error("cannot find regex '%s' in console", regex)
+ return False
+
+ def clean_orphan_security_groups(self):
+ """Clean all security groups which are not owned by an existing tenant
+
+ It lists all orphan security groups in use as debug to avoid
+ misunderstanding the testcase results (it could happen if cloud admin
+ removes accounts without cleaning the virtual machines)
+ """
+ sec_groups = self.orig_cloud.list_security_groups()
+ for sec_group in sec_groups:
+ if not sec_group.tenant_id:
+ continue
+ if not self.orig_cloud.get_project(sec_group.tenant_id):
+ self.__logger.debug("Cleaning security group %s", sec_group.id)
+ try:
+ self.orig_cloud.delete_security_group(sec_group.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "Orphan security group %s in use", sec_group.id)
+
+ def count_hypervisors(self):
+ """Count hypervisors."""
+ if env.get('SKIP_DOWN_HYPERVISORS').lower() == 'false':
+ return len(self.orig_cloud.list_hypervisors())
+ return self.count_active_hypervisors()
+
+ def count_active_hypervisors(self):
+ """Count all hypervisors which are up."""
+ compute_cnt = 0
+ for hypervisor in self.orig_cloud.list_hypervisors():
+ if hypervisor['state'] == 'up':
+ compute_cnt += 1
+ else:
+ self.__logger.warning(
+ "%s is down", hypervisor['hypervisor_hostname'])
+ return compute_cnt
+
+ def run(self, **kwargs):
+ """Boot the new VM
+
+ Here are the main actions:
+ - publish the image
+ - create the flavor
+
+ Returns:
+ - TestCase.EX_OK
+ - TestCase.EX_RUN_ERROR on error
+ """
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ assert super().run(
+ **kwargs) == testcase.TestCase.EX_OK
+ self.image = self.publish_image()
+ self.flavor = self.create_flavor()
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ self.result = 0
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.orig_cloud
+ assert self.cloud
+ super().clean()
+ if self.image:
+ self.cloud.delete_image(self.image.id)
+ if self.flavor:
+ self.orig_cloud.delete_flavor(self.flavor.id)
+ if env.get('CLEAN_ORPHAN_SECURITY_GROUPS').lower() == 'true':
+ self.clean_orphan_security_groups()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class VmReady2(VmReady1):
+ """Deploy a single VM reachable via ssh (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources, flavors, images, etc. required by advanced testcases.
+
+ It ensures that all testcases inheriting from SingleVm2 could work
+ without specific configurations (or at least read the same config data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'vmready2'
+ super().__init__(**kwargs)
+ try:
+ assert self.orig_cloud
+ self.project = tenantnetwork.NewProject(
+ self.orig_cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class SingleVm1(VmReady1):
+ """Deploy a single VM reachable via ssh (scenario1)
+
+ It inherits from TenantNetwork1 which creates all network resources and
+ completes it by booting a VM attached to that network.
+
+ It ensures that all testcases inheriting from SingleVm1 could work
+ without specific configurations (or at least read the same config data).
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+ username = 'cirros'
+ ssh_connect_timeout = 1
+ ssh_connect_loops = 6
+ create_floating_ip_timeout = 120
+ check_console_loop = 6
+ check_console_regex = ' login: '
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'singlevm1'
+ super().__init__(**kwargs)
+ self.sshvm = None
+ self.sec = None
+ self.fip = None
+ self.keypair = None
+ self.ssh = None
+ (_, self.key_filename) = tempfile.mkstemp()
+
+ def prepare(self):
+ """Create the security group and the keypair
+
+ It can be overriden to set other rules according to the services
+ running in the VM
+
+ Raises: Exception on error
+ """
+ assert self.cloud
+ self.keypair = self.cloud.create_keypair(
+ f'{self.case_name}-kp_{self.guid}')
+ self.__logger.debug("keypair: %s", self.keypair)
+ self.__logger.debug("private_key:\n%s", self.keypair.private_key)
+ with open(
+ self.key_filename, 'w', encoding='utf-8') as private_key_file:
+ private_key_file.write(self.keypair.private_key)
+ self.sec = self.cloud.create_security_group(
+ f'{self.case_name}-sg_{self.guid}',
+ f'created by OPNFV Functest ({self.case_name})')
+ self.cloud.create_security_group_rule(
+ self.sec.id, port_range_min='22', port_range_max='22',
+ protocol='tcp', direction='ingress')
+ self.cloud.create_security_group_rule(
+ self.sec.id, protocol='icmp', direction='ingress')
+
+ def connect(self, vm1):
+ """Connect to a virtual machine via ssh
+
+ It first adds a floating ip to the virtual machine and then establishes
+ the ssh connection.
+
+ Returns:
+ - (fip, ssh)
+ - None on error
+ """
+ assert vm1
+ fip = None
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ fip = self.cloud.create_floating_ip(
+ network=self.ext_net.id, server=vm1, wait=True,
+ timeout=self.create_floating_ip_timeout)
+ self.__logger.debug("floating_ip: %s", fip)
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
+ for loop in range(self.ssh_connect_loops):
+ try:
+ p_console = self.cloud.get_server_console(vm1)
+ self.__logger.debug("vm console: \n%s", p_console)
+ ssh.connect(
+ fip.floating_ip_address if fip else vm1.public_v4,
+ username=getattr(
+ config.CONF,
+ f'{self.case_name}_image_user', self.username),
+ key_filename=self.key_filename,
+ timeout=getattr(
+ config.CONF,
+ f'{self.case_name}_vm_ssh_connect_timeout',
+ self.ssh_connect_timeout))
+ break
+ except Exception as exc: # pylint: disable=broad-except
+ self.__logger.debug(
+ "try %s: cannot connect to %s: %s", loop + 1,
+ fip.floating_ip_address if fip else vm1.public_v4, exc)
+ time.sleep(9)
+ else:
+ self.__logger.error(
+ "cannot connect to %s", fip.floating_ip_address)
+ return None
+ return (fip, ssh)
+
+ def execute(self):
+ """Say hello world via ssh
+
+ It can be overriden to execute any command.
+
+ Returns: echo exit codes
+ """
+ (_, stdout, stderr) = self.ssh.exec_command('echo Hello World')
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
+ return stdout.channel.recv_exit_status()
+
+ def run(self, **kwargs):
+ """Boot the new VM
+
+ Here are the main actions:
+ - add a new ssh key
+ - boot the VM
+ - create the security group
+ - execute the right command over ssh
+
+ Returns:
+ - TestCase.EX_OK
+ - TestCase.EX_RUN_ERROR on error
+ """
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ assert super().run(
+ **kwargs) == testcase.TestCase.EX_OK
+ self.result = 0
+ self.prepare()
+ self.sshvm = self.boot_vm(
+ key_name=self.keypair.id, security_groups=[self.sec.id])
+ if self.check_regex_in_console(
+ self.sshvm.name, regex=self.check_console_regex,
+ loop=self.check_console_loop):
+ (self.fip, self.ssh) = self.connect(self.sshvm)
+ if not self.execute():
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.orig_cloud
+ assert self.cloud
+ if self.fip:
+ self.cloud.delete_floating_ip(self.fip.id)
+ if self.sshvm:
+ self.cloud.delete_server(self.sshvm, wait=True)
+ if self.sec:
+ self.cloud.delete_security_group(self.sec.id)
+ if self.keypair:
+ self.cloud.delete_keypair(self.keypair.name)
+ super().clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class SingleVm2(SingleVm1):
+ """Deploy a single VM reachable via ssh (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources and vms required by advanced testcases.
+
+ It ensures that all testcases inheriting from SingleVm2 could work
+ without specific configurations (or at least read the same config data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'singlevm2'
+ super().__init__(**kwargs)
+ try:
+ assert self.orig_cloud
+ self.project = tenantnetwork.NewProject(
+ self.orig_cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
diff --git a/functest/core/tenantnetwork.py b/functest/core/tenantnetwork.py
new file mode 100644
index 000000000..3670dbe8a
--- /dev/null
+++ b/functest/core/tenantnetwork.py
@@ -0,0 +1,326 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Ease deploying tenant networks
+
+It offers a simple way to create all tenant network resources required by a
+testcase (including all Functest ones):
+
+ - TenantNetwork1 selects the user and the project set as env vars
+ - TenantNetwork2 creates a user and project to isolate the same resources
+
+This classes could be reused by more complexed scenarios (Single VM)
+"""
+
+import logging
+import os
+import time
+import uuid
+
+import os_client_config
+import shade
+from tempest.lib.common.utils import data_utils
+from xtesting.core import testcase
+
+from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
+
+
+class NewProject():
+ """Ease creating new projects/users"""
+ # pylint: disable=too-many-instance-attributes
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, cloud, case_name, guid):
+ self.cloud = None
+ self.orig_cloud = cloud
+ self.case_name = case_name
+ self.guid = guid
+ self.project = None
+ self.user = None
+ self.password = None
+ self.domain = None
+ self.role_name = None
+ self.default_member = env.get('NEW_USER_ROLE')
+
+ def create(self):
+ """Create projects/users"""
+ assert self.orig_cloud
+ assert self.case_name
+ self.password = data_utils.rand_password().replace('%', '!')
+ self.__logger.debug("password: %s", self.password)
+ self.domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ self.project = self.orig_cloud.create_project(
+ name=f'{self.case_name[:18]}-project_{self.guid}',
+ description=f"Created by OPNFV Functest: {self.case_name}",
+ domain_id=self.domain.id)
+ self.__logger.debug("project: %s", self.project)
+ self.user = self.orig_cloud.create_user(
+ name=f'{self.case_name}-user_{self.guid}',
+ password=self.password,
+ domain_id=self.domain.id)
+ self.__logger.debug("user: %s", self.user)
+ try:
+ if self.orig_cloud.get_role(self.default_member):
+ self.role_name = self.default_member
+ elif self.orig_cloud.get_role(self.default_member.lower()):
+ self.role_name = self.default_member.lower()
+ else:
+ raise Exception(f"Cannot detect {self.default_member}")
+ except Exception: # pylint: disable=broad-except
+ self.__logger.info("Creating default role %s", self.default_member)
+ role = self.orig_cloud.create_role(self.default_member)
+ self.role_name = role.name
+ self.__logger.debug("role: %s", role)
+ self.orig_cloud.grant_role(
+ self.role_name, user=self.user.id, project=self.project.id,
+ domain=self.domain.id)
+ osconfig = os_client_config.config.OpenStackConfig()
+ osconfig.cloud_config[
+ 'clouds']['envvars']['project_name'] = self.project.name
+ osconfig.cloud_config[
+ 'clouds']['envvars']['project_id'] = self.project.id
+ osconfig.cloud_config['clouds']['envvars']['username'] = self.user.name
+ osconfig.cloud_config['clouds']['envvars']['password'] = self.password
+ self.__logger.debug("cloud_config %s", osconfig.cloud_config)
+ self.cloud = shade.OpenStackCloud(
+ cloud_config=osconfig.get_one_cloud())
+ self.__logger.debug("new cloud %s", self.cloud.auth)
+
+ def get_environ(self):
+ "Get new environ"
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.user.name,
+ OS_PROJECT_NAME=self.project.name,
+ OS_PROJECT_ID=self.project.id,
+ OS_PASSWORD=self.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ return environ
+
+ def clean(self):
+ """Remove projects/users"""
+ try:
+ assert self.orig_cloud
+ if self.user:
+ self.orig_cloud.delete_user(self.user.id)
+ if self.project:
+ self.orig_cloud.delete_project(self.project.id)
+ secgroups = self.orig_cloud.list_security_groups(
+ filters={'name': 'default',
+ 'project_id': self.project.id})
+ if secgroups:
+ sec_id = secgroups[0].id
+ self.orig_cloud.delete_security_group(sec_id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
+
+
+class TenantNetwork1(testcase.TestCase):
+ # pylint: disable=too-many-instance-attributes
+ """Create a tenant network (scenario1)
+
+ It creates and configures all tenant network resources required by
+ advanced testcases (subnet, network and router).
+
+ It ensures that all testcases inheriting from TenantNetwork1 could work
+ without network specific configurations (or at least read the same config
+ data).
+ """
+
+ __logger = logging.getLogger(__name__)
+ cidr = '192.168.120.0/24'
+ shared_network = False
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'tenantnetwork1'
+ super().__init__(**kwargs)
+ self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
+ self.res_dir = os.path.join(self.dir_results, self.case_name)
+ self.output_log_name = 'functest.log'
+ self.output_debug_log_name = 'functest.debug.log'
+ self.ext_net = None
+ try:
+ cloud_config = os_client_config.get_config()
+ self.cloud = self.orig_cloud = shade.OpenStackCloud(
+ cloud_config=cloud_config)
+ except Exception: # pylint: disable=broad-except
+ self.cloud = self.orig_cloud = None
+ self.__logger.exception("Cannot connect to Cloud")
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ try:
+ self.ext_net = self.get_external_network(self.cloud)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot get the external network")
+ self.guid = str(uuid.uuid4())
+ self.network = None
+ self.subnet = None
+ self.router = None
+
+ @staticmethod
+ def get_external_network(cloud):
+ """
+ Return the configured external network name or
+ the first retrieved external network name
+ """
+ assert cloud
+ if env.get("EXTERNAL_NETWORK"):
+ network = cloud.get_network(
+ env.get("EXTERNAL_NETWORK"), {"router:external": True})
+ if network:
+ return network
+ networks = cloud.list_networks({"router:external": True})
+ if networks:
+ return networks[0]
+ return None
+
+ @staticmethod
+ def get_default_role(cloud, member="Member"):
+ """Get the default role
+
+ It also tests the role in lowercase to avoid possible conflicts.
+ """
+ role = cloud.get_role(member)
+ if not role:
+ role = cloud.get_role(member.lower())
+ return role
+
+ @staticmethod
+ def get_public_auth_url(cloud):
+ """Get Keystone public endpoint"""
+ keystone_id = functest_utils.search_services(cloud, 'keystone')[0].id
+ endpoint = cloud.search_endpoints(
+ filters={'interface': 'public',
+ 'service_id': keystone_id})[0].url
+ return endpoint
+
+ def create_network_resources(self):
+ """Create all tenant network resources
+
+ It creates a router which gateway is the external network detected.
+ The new subnet is attached to that router.
+
+ Raises: expection on error
+ """
+ assert self.cloud
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ assert self.ext_net
+ provider = {}
+ if hasattr(config.CONF, f'{self.case_name}_network_type'):
+ provider["network_type"] = getattr(
+ config.CONF, f'{self.case_name}_network_type')
+ if hasattr(config.CONF, f'{self.case_name}_physical_network'):
+ provider["physical_network"] = getattr(
+ config.CONF, f'{self.case_name}_physical_network')
+ if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
+ provider["segmentation_id"] = getattr(
+ config.CONF, f'{self.case_name}_segmentation_id')
+ domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ project = self.orig_cloud.get_project(
+ self.cloud.auth['project_name'],
+ domain_id=domain.id)
+ self.network = self.orig_cloud.create_network(
+ f'{self.case_name}-net_{self.guid}',
+ provider=provider, project_id=project.id,
+ shared=self.shared_network)
+ self.__logger.debug("network: %s", self.network)
+
+ self.subnet = self.cloud.create_subnet(
+ self.network.id,
+ subnet_name=f'{self.case_name}-subnet_{self.guid}',
+ cidr=getattr(
+ config.CONF, f'{self.case_name}_private_subnet_cidr',
+ self.cidr),
+ enable_dhcp=True,
+ dns_nameservers=[env.get('NAMESERVER')])
+ self.__logger.debug("subnet: %s", self.subnet)
+
+ self.router = self.cloud.create_router(
+ name=f'{self.case_name}-router_{self.guid}',
+ ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
+ self.__logger.debug("router: %s", self.router)
+ self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
+
+ def run(self, **kwargs):
+ status = testcase.TestCase.EX_RUN_ERROR
+ try:
+ assert self.cloud
+ self.start_time = time.time()
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ self.create_network_resources()
+ self.result = 100
+ status = testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception('Cannot run %s', self.case_name)
+ finally:
+ self.stop_time = time.time()
+ return status
+
+ def clean(self):
+ try:
+ assert self.cloud
+ if self.router:
+ if self.subnet:
+ self.cloud.remove_router_interface(
+ self.router, self.subnet.id)
+ self.cloud.delete_router(self.router.id)
+ if self.subnet:
+ self.cloud.delete_subnet(self.subnet.id)
+ if self.network:
+ self.cloud.delete_network(self.network.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("cannot clean all resources")
+
+
+class TenantNetwork2(TenantNetwork1):
+ """Create a tenant network (scenario2)
+
+ It creates new user/project before creating and configuring all tenant
+ network resources required by a testcase (subnet, network and router).
+
+ It ensures that all testcases inheriting from TenantNetwork2 could work
+ without network specific configurations (or at least read the same config
+ data).
+ """
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'tenantnetwork2'
+ super().__init__(**kwargs)
+ try:
+ assert self.cloud
+ self.project = NewProject(
+ self.cloud, self.case_name, self.guid)
+ self.project.create()
+ self.cloud = self.project.cloud
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create user or project")
+ self.cloud = None
+ self.project = None
+
+ def clean(self):
+ try:
+ super().clean()
+ assert self.project
+ self.project.clean()
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot clean all resources")
diff --git a/functest/core/testcase.py b/functest/core/testcase.py
deleted file mode 100644
index fa3802872..000000000
--- a/functest/core/testcase.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class of all Functest TestCases."""
-
-import logging
-import os
-
-import functest.utils.functest_utils as ft_utils
-
-import prettytable
-
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class TestCase(object):
- """Base model for single test case."""
-
- EX_OK = os.EX_OK
- """everything is OK"""
-
- EX_RUN_ERROR = os.EX_SOFTWARE
- """run() failed"""
-
- EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1
- """push_to_db() failed"""
-
- EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2
- """results are false"""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- self.details = {}
- self.project_name = kwargs.get('project_name', 'functest')
- self.case_name = kwargs.get('case_name', '')
- self.criteria = kwargs.get('criteria', 100)
- self.result = 0
- self.start_time = 0
- self.stop_time = 0
-
- def __str__(self):
- try:
- assert self.project_name
- assert self.case_name
- result = 'PASS' if(self.is_successful(
- ) == TestCase.EX_OK) else 'FAIL'
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['test case', 'project', 'duration',
- 'result'])
- msg.add_row([self.case_name, self.project_name,
- self.get_duration(), result])
- return msg.get_string()
- except AssertionError:
- self.__logger.error("We cannot print invalid objects")
- return super(TestCase, self).__str__()
-
- def get_duration(self):
- """Return the duration of the test case.
-
- Returns:
- duration if start_time and stop_time are set
- "XX:XX" otherwise.
- """
- try:
- assert self.start_time
- assert self.stop_time
- if self.stop_time < self.start_time:
- return "XX:XX"
- return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod(
- self.stop_time - self.start_time, 60))
- except Exception: # pylint: disable=broad-except
- self.__logger.error("Please run test before getting the duration")
- return "XX:XX"
-
- def is_successful(self):
- """Interpret the result of the test case.
-
- It allows getting the result of TestCase. It completes run()
- which only returns the execution status.
-
- It can be overriden if checking result is not suitable.
-
- Returns:
- TestCase.EX_OK if result is 'PASS'.
- TestCase.EX_TESTCASE_FAILED otherwise.
- """
- try:
- assert self.criteria
- assert self.result is not None
- if (not isinstance(self.result, str) and
- not isinstance(self.criteria, str)):
- if self.result >= self.criteria:
- return TestCase.EX_OK
- else:
- # Backward compatibility
- # It must be removed as soon as TestCase subclasses
- # stop setting result = 'PASS' or 'FAIL'.
- # In this case criteria is unread.
- self.__logger.warning(
- "Please update result which must be an int!")
- if self.result == 'PASS':
- return TestCase.EX_OK
- except AssertionError:
- self.__logger.error("Please run test before checking the results")
- return TestCase.EX_TESTCASE_FAILED
-
- def run(self, **kwargs):
- """Run the test case.
-
- It allows running TestCase and getting its execution
- status.
-
- The subclasses must override the default implementation which
- is false on purpose.
-
- The new implementation must set the following attributes to
- push the results to DB:
-
- * result,
- * start_time,
- * stop_time.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_RUN_ERROR.
- """
- # pylint: disable=unused-argument
- self.__logger.error("Run must be implemented")
- return TestCase.EX_RUN_ERROR
-
- def push_to_db(self):
- """Push the results of the test case to the DB.
-
- It allows publishing the results and to check the status.
-
- It could be overriden if the common implementation is not
- suitable. The following attributes must be set before pushing
- the results to DB:
-
- * project_name,
- * case_name,
- * result,
- * start_time,
- * stop_time.
-
- Returns:
- TestCase.EX_OK if results were pushed to DB.
- TestCase.EX_PUSH_TO_DB_ERROR otherwise.
- """
- try:
- assert self.project_name
- assert self.case_name
- assert self.start_time
- assert self.stop_time
- pub_result = 'PASS' if self.is_successful(
- ) == TestCase.EX_OK else 'FAIL'
- if ft_utils.push_results_to_db(
- self.project_name, self.case_name, self.start_time,
- self.stop_time, pub_result, self.details):
- self.__logger.info(
- "The results were successfully pushed to DB")
- return TestCase.EX_OK
- else:
- self.__logger.error("The results cannot be pushed to DB")
- return TestCase.EX_PUSH_TO_DB_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("The results cannot be pushed to DB")
- return TestCase.EX_PUSH_TO_DB_ERROR
-
- def clean(self):
- """Clean the resources.
-
- It can be overriden if resources must be deleted after
- running the test case.
- """
diff --git a/functest/core/unit.py b/functest/core/unit.py
deleted file mode 100644
index 61b5a58d9..000000000
--- a/functest/core/unit.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cable Television Laboratories, Inc. and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class to run unittest.TestSuite as TestCase."""
-
-from __future__ import division
-
-import logging
-import time
-import unittest
-
-import six
-
-from functest.core import testcase
-
-__author__ = ("Steven Pisarski <s.pisarski@cablelabs.com>, "
- "Cedric Ollivier <cedric.ollivier@orange.com>")
-
-
-class Suite(testcase.TestCase):
- """Base model for running unittest.TestSuite."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(Suite, self).__init__(**kwargs)
- self.suite = None
-
- def run(self, **kwargs):
- """Run the test suite.
-
- It allows running any unittest.TestSuite and getting its
- execution status.
-
- By default, it runs the suite defined as instance attribute.
- It can be overriden by passing name as arg. It must
- conform with TestLoader.loadTestsFromName().
-
- It sets the following attributes required to push the results
- to DB:
-
- * result,
- * start_time,
- * stop_time,
- * details.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_OK if any TestSuite has been run,
- TestCase.EX_RUN_ERROR otherwise.
- """
- try:
- name = kwargs["name"]
- try:
- self.suite = unittest.TestLoader().loadTestsFromName(name)
- except ImportError:
- self.__logger.error("Can not import %s", name)
- return testcase.TestCase.EX_RUN_ERROR
- except KeyError:
- pass
- try:
- assert self.suite
- self.start_time = time.time()
- stream = six.StringIO()
- result = unittest.TextTestRunner(
- stream=stream, verbosity=2).run(self.suite)
- self.__logger.debug("\n\n%s", stream.getvalue())
- self.stop_time = time.time()
- self.details = {
- "testsRun": result.testsRun,
- "failures": len(result.failures),
- "errors": len(result.errors),
- "stream": stream.getvalue()}
- self.result = 100 * (
- (result.testsRun - (len(result.failures) +
- len(result.errors))) /
- result.testsRun)
- return testcase.TestCase.EX_OK
- except AssertionError:
- self.__logger.error("No suite is defined")
- return testcase.TestCase.EX_RUN_ERROR
- except ZeroDivisionError:
- self.__logger.error("No test has been run")
- return testcase.TestCase.EX_RUN_ERROR
diff --git a/functest/core/vnf.py b/functest/core/vnf.py
deleted file mode 100644
index 73aaf446e..000000000
--- a/functest/core/vnf.py
+++ /dev/null
@@ -1,206 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class of all VNF TestCases."""
-
-import logging
-import time
-
-import functest.core.testcase as base
-from functest.utils.constants import CONST
-from snaps.config.user import UserConfig
-from snaps.config.project import ProjectConfig
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.create_project import OpenStackProject
-from snaps.openstack.tests import openstack_tests
-
-__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
- "Valentin Boucher <valentin.boucher@orange.com>")
-
-
-class VnfPreparationException(Exception):
- """Raise when VNF preparation cannot be executed."""
-
-
-class OrchestratorDeploymentException(Exception):
- """Raise when orchestrator cannot be deployed."""
-
-
-class VnfDeploymentException(Exception):
- """Raise when VNF cannot be deployed."""
-
-
-class VnfTestException(Exception):
- """Raise when VNF cannot be tested."""
-
-
-class VnfOnBoarding(base.TestCase):
- """Base model for VNF test cases."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(VnfOnBoarding, self).__init__(**kwargs)
- self.tenant_name = CONST.__getattribute__(
- 'vnf_{}_tenant_name'.format(self.case_name))
- self.snaps_creds = {}
- self.created_object = []
- self.os_project = None
-
- def run(self, **kwargs):
- """
- Run of the VNF test case:
-
- * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...),
- * Deploy the VNF,
- * Perform tests on the VNF
-
- A VNF test case is successfull when the 3 steps are PASS
- If one of the step is FAIL, the test case is FAIL
-
- Returns:
- TestCase.EX_OK if result is 'PASS'.
- TestCase.EX_TESTCASE_FAILED otherwise.
- """
- self.start_time = time.time()
-
- try:
- self.prepare()
- if (self.deploy_orchestrator() and
- self.deploy_vnf() and
- self.test_vnf()):
- self.stop_time = time.time()
- # Calculation with different weight depending on the steps TODO
- self.result = 100
- return base.TestCase.EX_OK
- else:
- self.result = 0
- self.stop_time = time.time()
- return base.TestCase.EX_TESTCASE_FAILED
- except Exception: # pylint: disable=broad-except
- self.stop_time = time.time()
- self.__logger.exception("Exception on VNF testing")
- return base.TestCase.EX_TESTCASE_FAILED
-
- def prepare(self):
- """
- Prepare the environment for VNF testing:
-
- * Creation of a user,
- * Creation of a tenant,
- * Allocation admin role to the user on this tenant
-
- Returns base.TestCase.EX_OK if preparation is successfull
-
- Raise VnfPreparationException in case of problem
- """
- try:
- tenant_description = CONST.__getattribute__(
- 'vnf_{}_tenant_description'.format(self.case_name))
- self.__logger.info("Prepare VNF: %s, description: %s",
- self.tenant_name, tenant_description)
- snaps_creds = openstack_tests.get_credentials(
- os_env_file=CONST.__getattribute__('openstack_creds'))
-
- project_creator = OpenStackProject(
- snaps_creds,
- ProjectConfig(
- name=self.tenant_name,
- description=tenant_description
- ))
- project_creator.create()
- self.created_object.append(project_creator)
- self.os_project = project_creator
-
- user_creator = OpenStackUser(
- snaps_creds,
- UserConfig(
- name=self.tenant_name,
- password=self.tenant_name,
- roles={'admin': self.tenant_name}))
-
- user_creator.create()
- self.created_object.append(user_creator)
-
- self.snaps_creds = user_creator.get_os_creds(self.tenant_name)
-
- return base.TestCase.EX_OK
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Exception raised during VNF preparation")
- raise VnfPreparationException
-
- def deploy_orchestrator(self):
- """
- Deploy an orchestrator (optional).
-
- If this method is overriden then raise orchestratorDeploymentException
- if error during orchestrator deployment
- """
- self.__logger.info("Deploy orchestrator (if necessary)")
- return True
-
- def deploy_vnf(self):
- """
- Deploy the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- The deployment can be executed via a specific orchestrator
- or using build-in orchestrators such as heat, OpenBaton, cloudify,
- juju, onap, ...
-
- Returns:
- True if the VNF is properly deployed
- False if the VNF is not deployed
-
- Raise VnfDeploymentException if error during VNF deployment
- """
- self.__logger.error("VNF must be deployed")
- raise VnfDeploymentException
-
- def test_vnf(self):
- """
- Test the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- Once a VNF is deployed, it is assumed that specific test suite can be
- run to validate the VNF.
- Please note that the same test suite can be used on several test case
- (e.g. clearwater test suite can be used whatever the orchestrator used
- for the deployment)
-
- Returns:
- True if VNF tests are PASS
- False if test suite is FAIL
-
- Raise VnfTestException if error during VNF test
- """
- self.__logger.error("VNF must be tested")
- raise VnfTestException
-
- def clean(self):
- """
- Clean VNF test case.
-
- It is up to the test providers to delete resources used for the tests.
- By default we clean:
-
- * the user,
- * the tenant
- """
- self.__logger.info("test cleaning")
- self.__logger.info('Remove the cloudify manager OS object ..')
- for creator in reversed(self.created_object):
- try:
- creator.clean()
- except Exception as exc: # pylint: disable=broad-except
- self.__logger.error('Unexpected error cleaning - %s', exc)