diff options
author | Cédric Ollivier <cedric.ollivier@orange.com> | 2018-02-28 09:35:49 +0100 |
---|---|---|
committer | Cédric Ollivier <cedric.ollivier@orange.com> | 2018-02-28 09:36:32 +0100 |
commit | 2aab5c48df64b044ab9bae6e883e6e0acaabbf52 (patch) | |
tree | c82294952795b3953130bf624929d6ecae3e4fcf /functest | |
parent | baa8f2d5f67d45e5761f92cb93fe22050f08d0fe (diff) |
Rename all Functest refs to Xtesting
It mainly renames python modules and then the related documentation
config files.
Change-Id: I186010bb88d3d39afe7b8fd1ebcef9c690cc1282
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
Diffstat (limited to 'functest')
36 files changed, 0 insertions, 4246 deletions
diff --git a/functest/__init__.py b/functest/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/__init__.py +++ /dev/null diff --git a/functest/ci/__init__.py b/functest/ci/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/ci/__init__.py +++ /dev/null diff --git a/functest/ci/logging.ini b/functest/ci/logging.ini deleted file mode 100644 index f1ab7241..00000000 --- a/functest/ci/logging.ini +++ /dev/null @@ -1,80 +0,0 @@ -[loggers] -keys=root,functest,api,ci,cli,core,energy,opnfv_tests,utils - -[handlers] -keys=console,wconsole,file,null - -[formatters] -keys=standard - -[logger_root] -level=NOTSET -handlers=null - -[logger_functest] -level=NOTSET -handlers=file -qualname=functest - -[logger_api] -level=NOTSET -handlers=wconsole -qualname=functest.api - -[logger_ci] -level=NOTSET -handlers=console -qualname=functest.ci - -[logger_cli] -level=NOTSET -handlers=wconsole -qualname=functest.cli - -[logger_core] -level=NOTSET -handlers=console -qualname=functest.core - -[logger_energy] -level=NOTSET -handlers=wconsole -qualname=functest.energy - -[logger_opnfv_tests] -level=NOTSET -handlers=wconsole -qualname=functest.opnfv_tests - -[logger_utils] -level=NOTSET -handlers=wconsole -qualname=functest.utils - -[handler_null] -class=NullHandler -level=NOTSET -formatter=standard -args=() - -[handler_console] -class=StreamHandler -level=INFO -formatter=standard -args=(sys.stdout,) - -[handler_wconsole] -class=StreamHandler -level=WARN -formatter=standard -args=(sys.stdout,) - -[handler_file] -class=FileHandler -level=DEBUG -formatter=standard -args=("/home/opnfv/functest/results/functest.log",) - -[formatter_standard] -format=%(asctime)s - %(name)s - %(levelname)s - %(message)s -datefmt= diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py deleted file mode 100644 index 651a3851..00000000 --- a/functest/ci/run_tests.py +++ /dev/null @@ -1,302 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -""" The entry of running tests: -1) Parses functest/ci/testcases.yaml to check which testcase(s) to be run -2) Execute the common operations on every testcase (run, push results to db...) -3) Return the right status code -""" - -import argparse -import importlib -import logging -import logging.config -import os -import re -import sys -import textwrap -import pkg_resources - -import enum -import prettytable -import yaml - -from functest.ci import tier_builder -from functest.core import testcase -from functest.utils import constants -from functest.utils import env - -LOGGER = logging.getLogger('functest.ci.run_tests') - - -class Result(enum.Enum): - """The overall result in enumerated type""" - # pylint: disable=too-few-public-methods - EX_OK = os.EX_OK - EX_ERROR = -1 - - -class BlockingTestFailed(Exception): - """Exception when the blocking test fails""" - pass - - -class TestNotEnabled(Exception): - """Exception when the test is not enabled""" - pass - - -class RunTestsParser(object): - """Parser to run tests""" - # pylint: disable=too-few-public-methods - - def __init__(self): - self.parser = argparse.ArgumentParser() - self.parser.add_argument("-t", "--test", dest="test", action='store', - help="Test case or tier (group of tests) " - "to be executed. It will run all the test " - "if not specified.") - self.parser.add_argument("-n", "--noclean", help="Do not clean " - "OpenStack resources after running each " - "test (default=false).", - action="store_true") - self.parser.add_argument("-r", "--report", help="Push results to " - "database (default=false).", - action="store_true") - - def parse_args(self, argv=None): - """Parse arguments. - - It can call sys.exit if arguments are incorrect. - - Returns: - the arguments from cmdline - """ - return vars(self.parser.parse_args(argv)) - - -class Runner(object): - """Runner class""" - - def __init__(self): - self.executed_test_cases = {} - self.overall_result = Result.EX_OK - self.clean_flag = True - self.report_flag = False - self.tiers = tier_builder.TierBuilder( - env.get('INSTALLER_TYPE'), - env.get('DEPLOY_SCENARIO'), - pkg_resources.resource_filename('functest', 'ci/testcases.yaml')) - - @staticmethod - def source_envfile(rc_file=constants.ENV_FILE): - """Source the env file passed as arg""" - if not os.path.isfile(rc_file): - LOGGER.debug("No env file %s found", rc_file) - return - with open(rc_file, "r") as rcfd: - for line in rcfd: - var = (line.rstrip('"\n').replace('export ', '').split( - "=") if re.search(r'(.*)=(.*)', line) else None) - # The two next lines should be modified as soon as rc_file - # conforms with common rules. Be aware that it could induce - # issues if value starts with ' - if var: - key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0]) - value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:])) - os.environ[key] = value - rcfd.seek(0, 0) - LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read()) - - @staticmethod - def get_dict_by_test(testname): - # pylint: disable=bad-continuation,missing-docstring - with open(pkg_resources.resource_filename( - 'functest', 'ci/testcases.yaml')) as tyaml: - testcases_yaml = yaml.safe_load(tyaml) - for dic_tier in testcases_yaml.get("tiers"): - for dic_testcase in dic_tier['testcases']: - if dic_testcase['case_name'] == testname: - return dic_testcase - LOGGER.error('Project %s is not defined in testcases.yaml', testname) - return None - - @staticmethod - def get_run_dict(testname): - """Obtain the 'run' block of the testcase from testcases.yaml""" - try: - dic_testcase = Runner.get_dict_by_test(testname) - if not dic_testcase: - LOGGER.error("Cannot get %s's config options", testname) - elif 'run' in dic_testcase: - return dic_testcase['run'] - return None - except Exception: # pylint: disable=broad-except - LOGGER.exception("Cannot get %s's config options", testname) - return None - - def run_test(self, test): - """Run one test case""" - if not test.is_enabled(): - raise TestNotEnabled( - "The test case {} is not enabled".format(test.get_name())) - LOGGER.info("Running test case '%s'...", test.get_name()) - result = testcase.TestCase.EX_RUN_ERROR - run_dict = self.get_run_dict(test.get_name()) - if run_dict: - try: - module = importlib.import_module(run_dict['module']) - cls = getattr(module, run_dict['class']) - test_dict = Runner.get_dict_by_test(test.get_name()) - test_case = cls(**test_dict) - self.executed_test_cases[test.get_name()] = test_case - try: - kwargs = run_dict['args'] - test_case.run(**kwargs) - except KeyError: - test_case.run() - if self.report_flag: - test_case.push_to_db() - if test.get_project() == "functest": - result = test_case.is_successful() - else: - result = testcase.TestCase.EX_OK - LOGGER.info("Test result:\n\n%s\n", test_case) - if self.clean_flag: - test_case.clean() - except ImportError: - LOGGER.exception("Cannot import module %s", run_dict['module']) - except AttributeError: - LOGGER.exception("Cannot get class %s", run_dict['class']) - else: - raise Exception("Cannot import the class for the test case.") - return result - - def run_tier(self, tier): - """Run one tier""" - tier_name = tier.get_name() - tests = tier.get_tests() - if not tests: - LOGGER.info("There are no supported test cases in this tier " - "for the given scenario") - self.overall_result = Result.EX_ERROR - else: - LOGGER.info("Running tier '%s'", tier_name) - for test in tests: - self.run_test(test) - test_case = self.executed_test_cases[test.get_name()] - if test_case.is_successful() != testcase.TestCase.EX_OK: - LOGGER.error("The test case '%s' failed.", test.get_name()) - if test.get_project() == "functest": - self.overall_result = Result.EX_ERROR - if test.is_blocking(): - raise BlockingTestFailed( - "The test case {} failed and is blocking".format( - test.get_name())) - return self.overall_result - - def run_all(self): - """Run all available testcases""" - tiers_to_run = [] - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['tiers', 'order', 'CI Loop', 'description', - 'testcases']) - for tier in self.tiers.get_tiers(): - ci_loop = env.get('CI_LOOP') - if (tier.get_tests() and - re.search(ci_loop, tier.get_ci_loop()) is not None): - tiers_to_run.append(tier) - msg.add_row([tier.get_name(), tier.get_order(), - tier.get_ci_loop(), - textwrap.fill(tier.description, width=40), - textwrap.fill(' '.join([str(x.get_name( - )) for x in tier.get_tests()]), width=40)]) - LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg) - for tier in tiers_to_run: - self.run_tier(tier) - - def main(self, **kwargs): - """Entry point of class Runner""" - if 'noclean' in kwargs: - self.clean_flag = not kwargs['noclean'] - if 'report' in kwargs: - self.report_flag = kwargs['report'] - try: - LOGGER.info("Deployment description:\n\n%s\n", env.string()) - self.source_envfile() - if 'test' in kwargs: - LOGGER.debug("Test args: %s", kwargs['test']) - if self.tiers.get_tier(kwargs['test']): - self.run_tier(self.tiers.get_tier(kwargs['test'])) - elif self.tiers.get_test(kwargs['test']): - result = self.run_test( - self.tiers.get_test(kwargs['test'])) - if result != testcase.TestCase.EX_OK: - LOGGER.error("The test case '%s' failed.", - kwargs['test']) - self.overall_result = Result.EX_ERROR - elif kwargs['test'] == "all": - self.run_all() - else: - LOGGER.error("Unknown test case or tier '%s', or not " - "supported by the given scenario '%s'.", - kwargs['test'], - env.get('DEPLOY_SCENARIO')) - LOGGER.debug("Available tiers are:\n\n%s", - self.tiers) - return Result.EX_ERROR - else: - self.run_all() - except BlockingTestFailed: - pass - except Exception: # pylint: disable=broad-except - LOGGER.exception("Failures when running testcase(s)") - self.overall_result = Result.EX_ERROR - if not self.tiers.get_test(kwargs['test']): - self.summary(self.tiers.get_tier(kwargs['test'])) - LOGGER.info("Execution exit value: %s", self.overall_result) - return self.overall_result - - def summary(self, tier=None): - """To generate functest report showing the overall results""" - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'project', 'tier', - 'duration', 'result']) - tiers = [tier] if tier else self.tiers.get_tiers() - for each_tier in tiers: - for test in each_tier.get_tests(): - try: - test_case = self.executed_test_cases[test.get_name()] - except KeyError: - msg.add_row([test.get_name(), test.get_project(), - each_tier.get_name(), "00:00", "SKIP"]) - else: - result = 'PASS' if(test_case.is_successful( - ) == test_case.EX_OK) else 'FAIL' - msg.add_row( - [test_case.case_name, test_case.project_name, - self.tiers.get_tier_name(test_case.case_name), - test_case.get_duration(), result]) - for test in each_tier.get_skipped_test(): - msg.add_row([test.get_name(), test.get_project(), - each_tier.get_name(), "00:00", "SKIP"]) - LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg) - - -def main(): - """Entry point""" - logging.config.fileConfig(pkg_resources.resource_filename( - 'functest', 'ci/logging.ini')) - logging.captureWarnings(True) - parser = RunTestsParser() - args = parser.parse_args(sys.argv[1:]) - runner = Runner() - return runner.main(**args).value diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml deleted file mode 100644 index 953d2aea..00000000 --- a/functest/ci/testcases.yaml +++ /dev/null @@ -1,424 +0,0 @@ ---- -tiers: - - - name: healthcheck - order: 0 - ci_loop: '(daily)|(weekly)' - description: >- - First tier to be executed to verify the basic - operations in the VIM. - testcases: - - - case_name: connection_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies the retrieval of OpenStack clients: - Keystone, Glance, Neutron and Nova and may perform some - simple queries. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: - 'functest.opnfv_tests.openstack.snaps.connection_check' - class: 'ConnectionCheck' - - - - case_name: api_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies the retrieval of OpenStack clients: - Keystone, Glance, Neutron and Nova and may perform some - simple queries. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.api_check' - class: 'ApiCheck' - - - - case_name: snaps_health_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case creates executes the SimpleHealthCheck - Python test class which creates an, image, flavor, network, - and Cirros VM instance and observes the console output to - validate the single port obtains the correct IP address. - dependencies: - installer: '' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.health_check' - class: 'HealthCheck' - - - - name: smoke - order: 1 - ci_loop: '(daily)|(weekly)' - description: >- - Set of basic Functional tests to validate the OPNFV scenarios. - testcases: - - - case_name: vping_ssh - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies: 1) SSH to an instance using - floating IPs over the public network. 2) Connectivity - between 2 instances over a private network. - dependencies: - installer: '' - scenario: '^((?!odl_l3|odl-bgpvpn|gluon).)*$' - run: - module: 'functest.opnfv_tests.openstack.vping.vping_ssh' - class: 'VPingSSH' - - - - case_name: vping_userdata - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies: 1) Boot a VM with given userdata. - 2) Connectivity between 2 instances over a private network. - dependencies: - installer: '' - scenario: '^((?!lxd).)*$' - run: - module: - 'functest.opnfv_tests.openstack.vping.vping_userdata' - class: 'VPingUserdata' - - - - case_name: tempest_smoke_serial - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs the smoke subset of the OpenStack - Tempest suite. The list of test cases is generated by - Tempest automatically and depends on the parameters of - the OpenStack deplopyment. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.tempest.tempest' - class: 'TempestSmokeSerial' - - - - case_name: rally_sanity - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs a sub group of tests of the OpenStack - Rally suite in smoke mode. - dependencies: - installer: '' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.rally.rally' - class: 'RallySanity' - - - - case_name: refstack_defcore - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs a sub group of tests of the OpenStack - Defcore testcases by using refstack client. - dependencies: - installer: '' - scenario: '' - run: - module: - 'functest.opnfv_tests.openstack.refstack_client.refstack_client' - class: 'RefstackClient' - - - - case_name: odl - project_name: functest - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller. It - integrates some test suites from upstream using - Robot as the test framework. - dependencies: - installer: '' - scenario: 'odl' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/odl_test/csit/suites/integration/basic - - /src/odl_test/csit/suites/openstack/neutron - - - - case_name: odl_netvirt - project_name: functest - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller when - the NetVirt features are installed. It integrates - some test suites from upstream using Robot as the - test framework. - dependencies: - installer: 'apex' - scenario: 'os-odl_l3-nofeature' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/odl_test/csit/suites/integration/basic - - /src/odl_test/csit/suites/openstack/neutron - - /src/odl_test/csit/suites/openstack/connectivity - - - - case_name: snaps_smoke - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case contains tests that setup and destroy - environments with VMs with and without Floating IPs - with a newly created user and project. Set the config - value snaps.use_floating_ips (True|False) to toggle - this functionality. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.smoke' - class: 'SnapsSmoke' - - - - name: features - order: 2 - ci_loop: '(daily)|(weekly)' - description: >- - Test suites from feature projects - integrated in functest - testcases: - - - case_name: doctor-notification - project_name: doctor - criteria: 100 - blocking: false - description: >- - Test suite from Doctor project. - dependencies: - installer: 'apex' - scenario: '^((?!fdio).)*$' - run: - module: 'functest.core.feature' - class: 'BashFeature' - args: - cmd: 'doctor-test' - - - - case_name: bgpvpn - project_name: sdnvpn - criteria: 100 - blocking: false - description: >- - Test suite from SDNVPN project. - dependencies: - installer: '(fuel)|(apex)|(netvirt)' - scenario: 'bgpvpn' - run: - module: 'sdnvpn.test.functest.run_sdnvpn_tests' - class: 'SdnvpnFunctest' - - - - case_name: functest-odl-sfc - project_name: sfc - criteria: 100 - blocking: false - description: >- - Test suite for odl-sfc to test two chains with one SF and - one chain with two SFs - dependencies: - installer: '' - scenario: 'odl.*sfc' - run: - module: 'functest.core.feature' - class: 'BashFeature' - args: - cmd: 'run_sfc_tests.py' - - - - case_name: barometercollectd - project_name: barometer - criteria: 100 - blocking: false - description: >- - Test suite for the Barometer project. Separate tests verify - the proper configuration and basic functionality of all the - collectd plugins as described in the Project Release Plan - dependencies: - installer: 'apex' - scenario: 'bar' - run: - module: 'baro_tests.barometer' - class: 'BarometerCollectd' - - - - case_name: fds - project_name: fastdatastacks - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller when GBP - features are installed. It integrates some test suites from - upstream using Robot as the test framework. - dependencies: - installer: 'apex' - scenario: 'odl.*-fdio' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/fds/testing/robot - - - - name: components - order: 3 - ci_loop: 'weekly' - description: >- - Extensive testing of OpenStack API. - testcases: - - - case_name: tempest_full_parallel - project_name: functest - criteria: 80 - blocking: false - description: >- - The list of test cases is generated by - Tempest automatically and depends on the parameters of - the OpenStack deplopyment. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.tempest.tempest' - class: 'TempestFullParallel' - - - - case_name: rally_full - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs the full suite of scenarios of the - OpenStack Rally suite using several threads and iterations. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.rally.rally' - class: 'RallyFull' - - - - name: vnf - order: 4 - ci_loop: '(daily)|(weekly)' - description: >- - Collection of VNF test cases. - testcases: - - - case_name: cloudify_ims - project_name: functest - criteria: 80 - blocking: false - description: >- - This test case deploys an OpenSource vIMS solution from - Clearwater using the Cloudify orchestrator. It also runs - some signaling traffic. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.ims.cloudify_ims' - class: 'CloudifyIms' - - - - case_name: vyos_vrouter - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case is vRouter testing. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.router.cloudify_vrouter' - class: 'CloudifyVrouter' - - - - case_name: orchestra_openims - project_name: orchestra - enabled: false - criteria: 100 - blocking: false - description: >- - OpenIMS VNF deployment with Open Baton (Orchestra) - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.ims.orchestra_openims' - class: 'OpenImsVnf' - - - - case_name: orchestra_clearwaterims - project_name: orchestra - enabled: false - criteria: 100 - blocking: false - description: >- - ClearwaterIMS VNF deployment with Open Baton (Orchestra) - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: - 'functest.opnfv_tests.vnf.ims.orchestra_clearwaterims' - class: 'ClearwaterImsVnf' - - - - case_name: juju_epc - project_name: functest - criteria: 100 - blocking: false - description: >- - vEPC validation with Juju as VNF manager and ABoT as test - executor. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.epc.juju_epc' - class: 'JujuEpc' diff --git a/functest/ci/tier_builder.py b/functest/ci/tier_builder.py deleted file mode 100644 index 370ab94d..00000000 --- a/functest/ci/tier_builder.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""TierBuilder class to parse testcases config file""" - -import yaml - -import functest.ci.tier_handler as th - - -class TierBuilder(object): - # pylint: disable=missing-docstring - - def __init__(self, ci_installer, ci_scenario, testcases_file): - self.ci_installer = ci_installer - self.ci_scenario = ci_scenario - self.testcases_file = testcases_file - self.dic_tier_array = None - self.tier_objects = [] - self.testcases_yaml = None - self.generate_tiers() - - def read_test_yaml(self): - with open(self.testcases_file) as tc_file: - self.testcases_yaml = yaml.safe_load(tc_file) - - self.dic_tier_array = [] - for tier in self.testcases_yaml.get("tiers"): - self.dic_tier_array.append(tier) - - def generate_tiers(self): - if self.dic_tier_array is None: - self.read_test_yaml() - - del self.tier_objects[:] - for dic_tier in self.dic_tier_array: - tier = th.Tier( - name=dic_tier['name'], order=dic_tier['order'], - ci_loop=dic_tier['ci_loop'], - description=dic_tier['description']) - - for dic_testcase in dic_tier['testcases']: - installer = dic_testcase['dependencies']['installer'] - scenario = dic_testcase['dependencies']['scenario'] - dep = th.Dependency(installer, scenario) - - testcase = th.TestCase( - name=dic_testcase['case_name'], - enabled=dic_testcase.get('enabled', True), - dependency=dep, criteria=dic_testcase['criteria'], - blocking=dic_testcase['blocking'], - description=dic_testcase['description'], - project=dic_testcase['project_name']) - if (testcase.is_compatible(self.ci_installer, - self.ci_scenario) and - testcase.is_enabled()): - tier.add_test(testcase) - else: - tier.skip_test(testcase) - - self.tier_objects.append(tier) - - def get_tiers(self): - return self.tier_objects - - def get_tier_names(self): - tier_names = [] - for tier in self.tier_objects: - tier_names.append(tier.get_name()) - return tier_names - - def get_tier(self, tier_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].get_name() == tier_name: - return self.tier_objects[i] - return None - - def get_tier_name(self, test_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].is_test(test_name): - return self.tier_objects[i].name - return None - - def get_test(self, test_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].is_test(test_name): - return self.tier_objects[i].get_test(test_name) - return None - - def get_tests(self, tier_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].get_name() == tier_name: - return self.tier_objects[i].get_tests() - return None - - def __str__(self): - output = "" - for i in range(0, len(self.tier_objects)): - output += str(self.tier_objects[i]) + "\n" - return output diff --git a/functest/ci/tier_handler.py b/functest/ci/tier_handler.py deleted file mode 100644 index 9fc3f24d..00000000 --- a/functest/ci/tier_handler.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Tier and TestCase classes to wrap the testcases config file""" -# pylint: disable=missing-docstring - -import re -import textwrap - -import prettytable - - -LINE_LENGTH = 72 - - -def split_text(text, max_len): - words = text.split() - lines = [] - line = "" - for word in words: - if len(line) + len(word) < max_len - 1: - line += word + " " - else: - lines.append(line) - line = word + " " - if line != "": - lines.append(line) - return lines - - -class Tier(object): - - def __init__(self, name, order, ci_loop, description=""): - self.tests_array = [] - self.skipped_tests_array = [] - self.name = name - self.order = order - self.ci_loop = ci_loop - self.description = description - - def add_test(self, testcase): - self.tests_array.append(testcase) - - def skip_test(self, testcase): - self.skipped_tests_array.append(testcase) - - def get_tests(self): - array_tests = [] - for test in self.tests_array: - array_tests.append(test) - return array_tests - - def get_skipped_test(self): - return self.skipped_tests_array - - def get_test_names(self): - array_tests = [] - for test in self.tests_array: - array_tests.append(test.get_name()) - return array_tests - - def get_test(self, test_name): - if self.is_test(test_name): - for test in self.tests_array: - if test.get_name() == test_name: - return test - return None - - def is_test(self, test_name): - for test in self.tests_array: - if test.get_name() == test_name: - return True - return False - - def get_name(self): - return self.name - - def get_order(self): - return self.order - - def get_ci_loop(self): - return self.ci_loop - - def __str__(self): - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['tiers', 'order', 'CI Loop', 'description', - 'testcases']) - msg.add_row( - [self.name, self.order, self.ci_loop, - textwrap.fill(self.description, width=40), - textwrap.fill(' '.join([str(x.get_name( - )) for x in self.get_tests()]), width=40)]) - return msg.get_string() - - -class TestCase(object): - - def __init__(self, name, enabled, dependency, criteria, blocking, - description="", project=""): - # pylint: disable=too-many-arguments - self.name = name - self.enabled = enabled - self.dependency = dependency - self.criteria = criteria - self.blocking = blocking - self.description = description - self.project = project - - @staticmethod - def is_none(item): - return item is None or item == "" - - def is_compatible(self, ci_installer, ci_scenario): - try: - if not self.is_none(ci_installer): - if re.search(self.dependency.get_installer(), - ci_installer) is None: - return False - if not self.is_none(ci_scenario): - if re.search(self.dependency.get_scenario(), - ci_scenario) is None: - return False - return True - except TypeError: - return False - - def get_name(self): - return self.name - - def is_enabled(self): - return self.enabled - - def get_criteria(self): - return self.criteria - - def is_blocking(self): - return self.blocking - - def get_project(self): - return self.project - - def __str__(self): - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'description', 'criteria', 'dependency']) - msg.add_row([self.name, textwrap.fill(self.description, width=40), - self.criteria, self.dependency]) - return msg.get_string() - - -class Dependency(object): - - def __init__(self, installer, scenario): - self.installer = installer - self.scenario = scenario - - def get_installer(self): - return self.installer - - def get_scenario(self): - return self.scenario - - def __str__(self): - delimitator = "\n" if self.get_installer( - ) and self.get_scenario() else "" - return "{}{}{}".format(self.get_installer(), delimitator, - self.get_scenario()) diff --git a/functest/core/__init__.py b/functest/core/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/core/__init__.py +++ /dev/null diff --git a/functest/core/feature.py b/functest/core/feature.py deleted file mode 100644 index 65fd5a08..00000000 --- a/functest/core/feature.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 ZTE Corp and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent classes of all Functest Features. - -Feature is considered as TestCase offered by Third-party. It offers -helpers to run any python method or any bash command. -""" - -import logging -import subprocess -import time - -import functest.core.testcase as base - -__author__ = ("Serena Feng <feng.xiaowei@zte.com.cn>, " - "Cedric Ollivier <cedric.ollivier@orange.com>") - - -class Feature(base.TestCase): - """Base model for single feature.""" - - __logger = logging.getLogger(__name__) - dir_results = "/home/opnfv/functest/results" - - def __init__(self, **kwargs): - super(Feature, self).__init__(**kwargs) - self.result_file = "{}/{}.log".format(self.dir_results, self.case_name) - try: - module = kwargs['run']['module'] - self.logger = logging.getLogger(module) - except KeyError: - self.__logger.warning( - "Cannot get module name %s. Using %s as fallback", - kwargs, self.case_name) - self.logger = logging.getLogger(self.case_name) - handler = logging.StreamHandler() - handler.setLevel(logging.WARN) - self.logger.addHandler(handler) - handler = logging.FileHandler(self.result_file) - handler.setLevel(logging.DEBUG) - self.logger.addHandler(handler) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - def execute(self, **kwargs): - """Execute the Python method. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must return 0 if success or anything - else if failure. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - -1. - """ - # pylint: disable=unused-argument,no-self-use - return -1 - - def run(self, **kwargs): - """Run the feature. - - It allows executing any Python method by calling execute(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time. - - It doesn't fulfill details when pushing the results to the DB. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if execute() returns 0, - TestCase.EX_RUN_ERROR otherwise. - """ - self.start_time = time.time() - exit_code = base.TestCase.EX_RUN_ERROR - self.result = 0 - try: - if self.execute(**kwargs) == 0: - exit_code = base.TestCase.EX_OK - self.result = 100 - except Exception: # pylint: disable=broad-except - self.__logger.exception("%s FAILED", self.project_name) - self.__logger.info("Test result is stored in '%s'", self.result_file) - self.stop_time = time.time() - return exit_code - - -class BashFeature(Feature): - """Class designed to run any bash command.""" - - __logger = logging.getLogger(__name__) - - def execute(self, **kwargs): - """Execute the cmd passed as arg - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - 0 if cmd returns 0, - -1 otherwise. - """ - ret = -1 - try: - cmd = kwargs["cmd"] - with open(self.result_file, 'w+') as f_stdout: - proc = subprocess.Popen(cmd.split(), stdout=f_stdout, - stderr=subprocess.STDOUT) - ret = proc.wait() - if ret != 0: - self.__logger.error("Execute command: %s failed", cmd) - except KeyError: - self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs) - return ret diff --git a/functest/core/robotframework.py b/functest/core/robotframework.py deleted file mode 100644 index 54574a68..00000000 --- a/functest/core/robotframework.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define classes required to run any Robot suites.""" - -from __future__ import division - -import errno -import logging -import os - -import robot.api -from robot.errors import RobotError -import robot.run -from robot.utils.robottime import timestamp_to_secs -from six import StringIO - -from functest.core import testcase - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class ResultVisitor(robot.api.ResultVisitor): - """Visitor to get result details.""" - - def __init__(self): - self._data = [] - - def visit_test(self, test): - output = {} - output['name'] = test.name - output['parent'] = test.parent.name - output['status'] = test.status - output['starttime'] = test.starttime - output['endtime'] = test.endtime - output['critical'] = test.critical - output['text'] = test.message - output['elapsedtime'] = test.elapsedtime - self._data.append(output) - - def get_data(self): - """Get the details of the result.""" - return self._data - - -class RobotFramework(testcase.TestCase): - """RobotFramework runner.""" - - __logger = logging.getLogger(__name__) - dir_results = "/home/opnfv/functest/results" - - def __init__(self, **kwargs): - self.res_dir = os.path.join(self.dir_results, 'robot') - self.xml_file = os.path.join(self.res_dir, 'output.xml') - super(RobotFramework, self).__init__(**kwargs) - - def parse_results(self): - """Parse output.xml and get the details in it.""" - result = robot.api.ExecutionResult(self.xml_file) - visitor = ResultVisitor() - result.visit(visitor) - try: - self.result = 100 * ( - result.suite.statistics.critical.passed / - result.suite.statistics.critical.total) - except ZeroDivisionError: - self.__logger.error("No test has been run") - self.start_time = timestamp_to_secs(result.suite.starttime) - self.stop_time = timestamp_to_secs(result.suite.endtime) - self.details = {} - self.details['description'] = result.suite.name - self.details['tests'] = visitor.get_data() - - def run(self, **kwargs): - """Run the RobotFramework suites - - Here are the steps: - * create the output directories if required, - * get the results in output.xml, - * delete temporary files. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - EX_OK if all suites ran well. - EX_RUN_ERROR otherwise. - """ - try: - suites = kwargs["suites"] - variable = kwargs.get("variable", []) - variablefile = kwargs.get("variablefile", []) - except KeyError: - self.__logger.exception("Mandatory args were not passed") - return self.EX_RUN_ERROR - try: - os.makedirs(self.res_dir) - except OSError as ex: - if ex.errno != errno.EEXIST: - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - stream = StringIO() - robot.run(*suites, variable=variable, variablefile=variablefile, - output=self.xml_file, log='NONE', - report='NONE', stdout=stream) - self.__logger.info("\n" + stream.getvalue()) - self.__logger.info("Results were successfully generated") - try: - self.parse_results() - self.__logger.info("Results were successfully parsed") - except RobotError as ex: - self.__logger.error("Run suites before publishing: %s", ex.message) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot parse results") - return self.EX_RUN_ERROR - return self.EX_OK diff --git a/functest/core/testcase.py b/functest/core/testcase.py deleted file mode 100644 index e8bb1409..00000000 --- a/functest/core/testcase.py +++ /dev/null @@ -1,227 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all Functest TestCases.""" - -from datetime import datetime -import json -import logging -import os -import re -import requests - -from functest.utils import decorators -from functest.utils import env - - -import prettytable - - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class TestCase(object): - """Base model for single test case.""" - - EX_OK = os.EX_OK - """everything is OK""" - - EX_RUN_ERROR = os.EX_SOFTWARE - """run() failed""" - - EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1 - """push_to_db() failed""" - - EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2 - """results are false""" - - _job_name_rule = "(dai|week)ly-(.+?)-[0-9]*" - _headers = {'Content-Type': 'application/json'} - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - self.details = {} - self.project_name = kwargs.get('project_name', 'functest') - self.case_name = kwargs.get('case_name', '') - self.criteria = kwargs.get('criteria', 100) - self.result = 0 - self.start_time = 0 - self.stop_time = 0 - - def __str__(self): - try: - assert self.project_name - assert self.case_name - result = 'PASS' if(self.is_successful( - ) == TestCase.EX_OK) else 'FAIL' - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'project', 'duration', - 'result']) - msg.add_row([self.case_name, self.project_name, - self.get_duration(), result]) - return msg.get_string() - except AssertionError: - self.__logger.error("We cannot print invalid objects") - return super(TestCase, self).__str__() - - def get_duration(self): - """Return the duration of the test case. - - Returns: - duration if start_time and stop_time are set - "XX:XX" otherwise. - """ - try: - assert self.start_time - assert self.stop_time - if self.stop_time < self.start_time: - return "XX:XX" - return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod( - self.stop_time - self.start_time, 60)) - except Exception: # pylint: disable=broad-except - self.__logger.error("Please run test before getting the duration") - return "XX:XX" - - def is_successful(self): - """Interpret the result of the test case. - - It allows getting the result of TestCase. It completes run() - which only returns the execution status. - - It can be overriden if checking result is not suitable. - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - try: - assert self.criteria - assert self.result is not None - if (not isinstance(self.result, str) and - not isinstance(self.criteria, str)): - if self.result >= self.criteria: - return TestCase.EX_OK - else: - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - # In this case criteria is unread. - self.__logger.warning( - "Please update result which must be an int!") - if self.result == 'PASS': - return TestCase.EX_OK - except AssertionError: - self.__logger.error("Please run test before checking the results") - return TestCase.EX_TESTCASE_FAILED - - def run(self, **kwargs): - """Run the test case. - - It allows running TestCase and getting its execution - status. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must set the following attributes to - push the results to DB: - - * result, - * start_time, - * stop_time. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_RUN_ERROR. - """ - # pylint: disable=unused-argument - self.__logger.error("Run must be implemented") - return TestCase.EX_RUN_ERROR - - @decorators.can_dump_request_to_file - def push_to_db(self): - """Push the results of the test case to the DB. - - It allows publishing the results and checking the status. - - It could be overriden if the common implementation is not - suitable. - - The following attributes must be set before pushing the results to DB: - - * project_name, - * case_name, - * result, - * start_time, - * stop_time. - - The next vars must be set in env: - - * TEST_DB_URL, - * INSTALLER_TYPE, - * DEPLOY_SCENARIO, - * NODE_NAME, - * BUILD_TAG. - - Returns: - TestCase.EX_OK if results were pushed to DB. - TestCase.EX_PUSH_TO_DB_ERROR otherwise. - """ - try: - assert self.project_name - assert self.case_name - assert self.start_time - assert self.stop_time - url = env.get('TEST_DB_URL') - data = {"project_name": self.project_name, - "case_name": self.case_name, - "details": self.details} - data["installer"] = env.get('INSTALLER_TYPE') - data["scenario"] = env.get('DEPLOY_SCENARIO') - data["pod_name"] = env.get('NODE_NAME') - data["build_tag"] = env.get('BUILD_TAG') - data["criteria"] = 'PASS' if self.is_successful( - ) == TestCase.EX_OK else 'FAIL' - data["start_date"] = datetime.fromtimestamp( - self.start_time).strftime('%Y-%m-%d %H:%M:%S') - data["stop_date"] = datetime.fromtimestamp( - self.stop_time).strftime('%Y-%m-%d %H:%M:%S') - try: - data["version"] = re.search( - TestCase._job_name_rule, - env.get('BUILD_TAG')).group(2) - except Exception: # pylint: disable=broad-except - data["version"] = "unknown" - req = requests.post( - url, data=json.dumps(data, sort_keys=True), - headers=self._headers) - req.raise_for_status() - self.__logger.info( - "The results were successfully pushed to DB %s", url) - except AssertionError: - self.__logger.exception( - "Please run test before publishing the results") - return TestCase.EX_PUSH_TO_DB_ERROR - except requests.exceptions.HTTPError: - self.__logger.exception("The HTTP request raises issues") - return TestCase.EX_PUSH_TO_DB_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("The results cannot be pushed to DB") - return TestCase.EX_PUSH_TO_DB_ERROR - return TestCase.EX_OK - - def clean(self): - """Clean the resources. - - It can be overriden if resources must be deleted after - running the test case. - """ diff --git a/functest/core/unit.py b/functest/core/unit.py deleted file mode 100644 index 61b5a58d..00000000 --- a/functest/core/unit.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cable Television Laboratories, Inc. and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class to run unittest.TestSuite as TestCase.""" - -from __future__ import division - -import logging -import time -import unittest - -import six - -from functest.core import testcase - -__author__ = ("Steven Pisarski <s.pisarski@cablelabs.com>, " - "Cedric Ollivier <cedric.ollivier@orange.com>") - - -class Suite(testcase.TestCase): - """Base model for running unittest.TestSuite.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(Suite, self).__init__(**kwargs) - self.suite = None - - def run(self, **kwargs): - """Run the test suite. - - It allows running any unittest.TestSuite and getting its - execution status. - - By default, it runs the suite defined as instance attribute. - It can be overriden by passing name as arg. It must - conform with TestLoader.loadTestsFromName(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time, - * details. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if any TestSuite has been run, - TestCase.EX_RUN_ERROR otherwise. - """ - try: - name = kwargs["name"] - try: - self.suite = unittest.TestLoader().loadTestsFromName(name) - except ImportError: - self.__logger.error("Can not import %s", name) - return testcase.TestCase.EX_RUN_ERROR - except KeyError: - pass - try: - assert self.suite - self.start_time = time.time() - stream = six.StringIO() - result = unittest.TextTestRunner( - stream=stream, verbosity=2).run(self.suite) - self.__logger.debug("\n\n%s", stream.getvalue()) - self.stop_time = time.time() - self.details = { - "testsRun": result.testsRun, - "failures": len(result.failures), - "errors": len(result.errors), - "stream": stream.getvalue()} - self.result = 100 * ( - (result.testsRun - (len(result.failures) + - len(result.errors))) / - result.testsRun) - return testcase.TestCase.EX_OK - except AssertionError: - self.__logger.error("No suite is defined") - return testcase.TestCase.EX_RUN_ERROR - except ZeroDivisionError: - self.__logger.error("No test has been run") - return testcase.TestCase.EX_RUN_ERROR diff --git a/functest/core/vnf.py b/functest/core/vnf.py deleted file mode 100644 index cf20492b..00000000 --- a/functest/core/vnf.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all VNF TestCases.""" - -import logging -import time -import uuid - -from snaps.config.user import UserConfig -from snaps.config.project import ProjectConfig -from snaps.openstack.create_user import OpenStackUser -from snaps.openstack.create_project import OpenStackProject -from snaps.openstack.tests import openstack_tests - -from functest.core import testcase -from functest.utils import constants - -__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, " - "Valentin Boucher <valentin.boucher@orange.com>") - - -class VnfPreparationException(Exception): - """Raise when VNF preparation cannot be executed.""" - - -class OrchestratorDeploymentException(Exception): - """Raise when orchestrator cannot be deployed.""" - - -class VnfDeploymentException(Exception): - """Raise when VNF cannot be deployed.""" - - -class VnfTestException(Exception): - """Raise when VNF cannot be tested.""" - - -class VnfOnBoarding(testcase.TestCase): - # pylint: disable=too-many-instance-attributes - """Base model for VNF test cases.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(VnfOnBoarding, self).__init__(**kwargs) - self.uuid = uuid.uuid4() - self.user_name = "{}-{}".format(self.case_name, self.uuid) - self.tenant_name = "{}-{}".format(self.case_name, self.uuid) - self.snaps_creds = {} - self.created_object = [] - self.os_project = None - self.tenant_description = "Created by OPNFV Functest: {}".format( - self.case_name) - - def run(self, **kwargs): - """ - Run of the VNF test case: - - * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...), - * Deploy the VNF, - * Perform tests on the VNF - - A VNF test case is successfull when the 3 steps are PASS - If one of the step is FAIL, the test case is FAIL - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - self.start_time = time.time() - - try: - self.prepare() - if (self.deploy_orchestrator() and - self.deploy_vnf() and - self.test_vnf()): - self.stop_time = time.time() - # Calculation with different weight depending on the steps TODO - self.result = 100 - return testcase.TestCase.EX_OK - self.result = 0 - self.stop_time = time.time() - return testcase.TestCase.EX_TESTCASE_FAILED - except Exception: # pylint: disable=broad-except - self.stop_time = time.time() - self.__logger.exception("Exception on VNF testing") - return testcase.TestCase.EX_TESTCASE_FAILED - - def prepare(self): - """ - Prepare the environment for VNF testing: - - * Creation of a user, - * Creation of a tenant, - * Allocation admin role to the user on this tenant - - Returns base.TestCase.EX_OK if preparation is successfull - - Raise VnfPreparationException in case of problem - """ - try: - self.__logger.info( - "Prepare VNF: %s, description: %s", self.case_name, - self.tenant_description) - snaps_creds = openstack_tests.get_credentials( - os_env_file=constants.ENV_FILE) - - self.os_project = OpenStackProject( - snaps_creds, - ProjectConfig( - name=self.tenant_name, - description=self.tenant_description - )) - self.os_project.create() - self.created_object.append(self.os_project) - user_creator = OpenStackUser( - snaps_creds, - UserConfig( - name=self.user_name, - password=str(uuid.uuid4()), - roles={'admin': self.tenant_name})) - user_creator.create() - self.created_object.append(user_creator) - self.snaps_creds = user_creator.get_os_creds(self.tenant_name) - - return testcase.TestCase.EX_OK - except Exception: # pylint: disable=broad-except - self.__logger.exception("Exception raised during VNF preparation") - raise VnfPreparationException - - def deploy_orchestrator(self): - """ - Deploy an orchestrator (optional). - - If this method is overriden then raise orchestratorDeploymentException - if error during orchestrator deployment - """ - self.__logger.info("Deploy orchestrator (if necessary)") - return True - - def deploy_vnf(self): - """ - Deploy the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - The deployment can be executed via a specific orchestrator - or using build-in orchestrators such as heat, OpenBaton, cloudify, - juju, onap, ... - - Returns: - True if the VNF is properly deployed - False if the VNF is not deployed - - Raise VnfDeploymentException if error during VNF deployment - """ - self.__logger.error("VNF must be deployed") - raise VnfDeploymentException - - def test_vnf(self): - """ - Test the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - Once a VNF is deployed, it is assumed that specific test suite can be - run to validate the VNF. - Please note that the same test suite can be used on several test case - (e.g. clearwater test suite can be used whatever the orchestrator used - for the deployment) - - Returns: - True if VNF tests are PASS - False if test suite is FAIL - - Raise VnfTestException if error during VNF test - """ - self.__logger.error("VNF must be tested") - raise VnfTestException - - def clean(self): - """ - Clean VNF test case. - - It is up to the test providers to delete resources used for the tests. - By default we clean: - - * the user, - * the tenant - """ - self.__logger.info('Removing the VNF resources ..') - for creator in reversed(self.created_object): - try: - creator.clean() - except Exception as exc: # pylint: disable=broad-except - self.__logger.error('Unexpected error cleaning - %s', exc) diff --git a/functest/energy/__init__.py b/functest/energy/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/energy/__init__.py +++ /dev/null diff --git a/functest/energy/energy.py b/functest/energy/energy.py deleted file mode 100644 index a2652211..00000000 --- a/functest/energy/energy.py +++ /dev/null @@ -1,336 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -*- - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""This module manages calls to Energy recording API.""" - -import json -import logging -import traceback - -from functools import wraps -import requests -from six.moves import urllib - -from functest.utils import env - - -def finish_session(current_scenario): - """Finish a recording session.""" - if current_scenario is None: - EnergyRecorder.stop() - else: - EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)", - current_scenario["scenario"], - current_scenario["step"]) - EnergyRecorder.submit_scenario( - current_scenario["scenario"], - current_scenario["step"] - ) - - -def enable_recording(method): - """ - Record energy during method execution. - - Decorator to record energy during "method" exection. - - param method: Method to suround with start and stop - :type method: function - - .. note:: "method" should belong to a class having a "case_name" - attribute - """ - @wraps(method) - def wrapper(*args): - """ - Record energy during method execution (implementation). - - Wrapper for decorator to handle method arguments. - """ - current_scenario = EnergyRecorder.get_current_scenario() - EnergyRecorder.start(args[0].case_name) - try: - return_value = method(*args) - finish_session(current_scenario) - except Exception as exc: # pylint: disable=broad-except - EnergyRecorder.logger.exception(exc) - finish_session(current_scenario) - raise exc - return return_value - return wrapper - - -# Class to manage energy recording sessions -class EnergyRecorder(object): - """Manage Energy recording session.""" - - logger = logging.getLogger(__name__) - # Energy recording API connectivity settings - # see load_config method - energy_recorder_api = None - - # Default initial step - INITIAL_STEP = "running" - - # Default connection timeout - CONNECTION_TIMEOUT = 4 - - @staticmethod - def load_config(): - """ - Load connectivity settings from yaml. - - Load connectivity settings to Energy recording API - Use functest global config yaml file - (see functest_utils.get_functest_config) - """ - # Singleton pattern for energy_recorder_api static member - # Load only if not previouly done - if EnergyRecorder.energy_recorder_api is None: - assert env.get('NODE_NAME') - assert env.get('ENERGY_RECORDER_API_URL') - environment = env.get('NODE_NAME') - energy_recorder_uri = env.get( - 'ENERGY_RECORDER_API_URL') - - # Creds - creds_usr = env.get("ENERGY_RECORDER_API_USER") - creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD") - - uri_comp = "/recorders/environment/" - uri_comp += urllib.parse.quote_plus(environment) - - if creds_usr and creds_pass: - energy_recorder_api_auth = (creds_usr, creds_pass) - else: - energy_recorder_api_auth = None - - try: - resp = requests.get(energy_recorder_uri + "/monitoring/ping", - auth=energy_recorder_api_auth, - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT) - api_available = json.loads(resp.text)["status"] == "OK" - EnergyRecorder.logger.info( - "API recorder available at : %s", - energy_recorder_uri + uri_comp) - except Exception as exc: # pylint: disable=broad-except - EnergyRecorder.logger.info( - "Energy recorder API is not available, cause=%s", - str(exc)) - api_available = False - # Final config - EnergyRecorder.energy_recorder_api = { - "uri": energy_recorder_uri + uri_comp, - "auth": energy_recorder_api_auth, - "available": api_available - } - return EnergyRecorder.energy_recorder_api["available"] - - @staticmethod - def submit_scenario(scenario, step): - """ - Submit a complet scenario definition to Energy recorder API. - - param scenario: Scenario name - :type scenario: string - param step: Step name - :type step: string - """ - try: - return_status = True - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Submitting scenario (%s/%s)", - scenario, step) - - # Create API payload - payload = { - "step": step, - "scenario": scenario - } - # Call API to start energy recording - response = requests.post( - EnergyRecorder.energy_recorder_api["uri"], - data=json.dumps(payload), - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while submitting scenario\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "submit_scenario: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while submitting scenarion to energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def start(scenario): - """ - Start a recording session for scenario. - - param scenario: Starting scenario - :type scenario: string - """ - return_status = True - try: - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Starting recording") - return_status = EnergyRecorder.submit_scenario( - scenario, - EnergyRecorder.INITIAL_STEP - ) - - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while starting energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def stop(): - """Stop current recording session.""" - return_status = True - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Stopping recording") - - # Call API to stop energy recording - response = requests.delete( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while stating energy recording session\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "stop: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while stoping energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def set_step(step): - """Notify energy recording service of current step of the testcase.""" - return_status = True - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Setting step") - - # Create API payload - payload = { - "step": step, - } - - # Call API to define step - response = requests.post( - EnergyRecorder.energy_recorder_api["uri"] + "/step", - data=json.dumps(payload), - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while setting current step of testcase\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "set_step: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while setting step on energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def get_current_scenario(): - """Get current running scenario (if any, None else).""" - return_value = None - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Getting current scenario") - - # Call API get running scenario - response = requests.get( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code == 200: - return_value = json.loads(response.text) - elif response.status_code == 404: - EnergyRecorder.logger.info( - "No current running scenario at %s", - EnergyRecorder.energy_recorder_api["uri"]) - return_value = None - else: - EnergyRecorder.logger.error( - "Error while getting current scenario\n%s", - response.text) - return_value = None - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "get_currernt_sceario: Unable to connect energy recorder API") - return_value = None - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while getting current scenario from energy recorder API" - "\n%s", traceback.format_exc() - ) - return_value = None - return return_value diff --git a/functest/tests/__init__.py b/functest/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/__init__.py +++ /dev/null diff --git a/functest/tests/unit/__init__.py b/functest/tests/unit/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/unit/__init__.py +++ /dev/null diff --git a/functest/tests/unit/ci/__init__.py b/functest/tests/unit/ci/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/unit/ci/__init__.py +++ /dev/null diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py deleted file mode 100644 index b8dca20c..00000000 --- a/functest/tests/unit/ci/test_run_tests.py +++ /dev/null @@ -1,267 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest -import os - -import mock - -from functest.ci import run_tests -from functest.core.testcase import TestCase - - -class FakeModule(TestCase): - - def run(self, **kwargs): - return TestCase.EX_OK - - -class RunTestsTesting(unittest.TestCase): - - def setUp(self): - self.runner = run_tests.Runner() - mock_test_case = mock.Mock() - mock_test_case.is_successful.return_value = TestCase.EX_OK - self.runner.executed_test_cases['test1'] = mock_test_case - self.runner.executed_test_cases['test2'] = mock_test_case - self.sep = 'test_sep' - self.creds = {'OS_AUTH_URL': 'http://test_ip:test_port/v2.0', - 'OS_USERNAME': 'test_os_username', - 'OS_TENANT_NAME': 'test_tenant', - 'OS_PASSWORD': 'test_password'} - self.test = {'test_name': 'test_name'} - self.tier = mock.Mock() - test1 = mock.Mock() - test1.get_name.return_value = 'test1' - test2 = mock.Mock() - test2.get_name.return_value = 'test2' - attrs = {'get_name.return_value': 'test_tier', - 'get_tests.return_value': [test1, test2], - 'get_ci_loop.return_value': 'test_ci_loop', - 'get_test_names.return_value': ['test1', 'test2']} - self.tier.configure_mock(**attrs) - - self.tiers = mock.Mock() - attrs = {'get_tiers.return_value': [self.tier]} - self.tiers.configure_mock(**attrs) - - self.run_tests_parser = run_tests.RunTestsParser() - - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test') - def test_get_run_dict(self, *args): - retval = {'run': mock.Mock()} - args[0].return_value = retval - self.assertEqual(self.runner.get_run_dict('test_name'), retval['run']) - args[0].assert_called_once_with('test_name') - - @mock.patch('functest.ci.run_tests.LOGGER.error') - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test', - return_value=None) - def test_get_run_dict_config_ko(self, *args): - testname = 'test_name' - self.assertEqual(self.runner.get_run_dict(testname), None) - args[0].return_value = {} - self.assertEqual(self.runner.get_run_dict(testname), None) - calls = [mock.call(testname), mock.call(testname)] - args[0].assert_has_calls(calls) - calls = [mock.call("Cannot get %s's config options", testname), - mock.call("Cannot get %s's config options", testname)] - args[1].assert_has_calls(calls) - - @mock.patch('functest.ci.run_tests.LOGGER.exception') - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test', - side_effect=Exception) - def test_get_run_dict_exception(self, *args): - testname = 'test_name' - self.assertEqual(self.runner.get_run_dict(testname), None) - args[1].assert_called_once_with( - "Cannot get %s's config options", testname) - - def _test_source_envfile(self, msg, key='OS_TENANT_NAME', value='admin'): - try: - del os.environ[key] - except Exception: # pylint: disable=broad-except - pass - envfile = 'rc_file' - with mock.patch('six.moves.builtins.open', - mock.mock_open(read_data=msg)) as mock_method,\ - mock.patch('os.path.isfile', return_value=True): - mock_method.return_value.__iter__ = lambda self: iter( - self.readline, '') - self.runner.source_envfile(envfile) - mock_method.assert_called_once_with(envfile, 'r') - self.assertEqual(os.environ[key], value) - - def test_source_envfile(self): - self._test_source_envfile('OS_TENANT_NAME=admin') - self._test_source_envfile('OS_TENANT_NAME= admin') - self._test_source_envfile('OS_TENANT_NAME = admin') - self._test_source_envfile('OS_TENANT_NAME = "admin"') - self._test_source_envfile('export OS_TENANT_NAME=admin') - self._test_source_envfile('export OS_TENANT_NAME =admin') - self._test_source_envfile('export OS_TENANT_NAME = admin') - self._test_source_envfile('export OS_TENANT_NAME = "admin"') - # This test will fail as soon as rc_file is fixed - self._test_source_envfile( - 'export "\'OS_TENANT_NAME\'" = "\'admin\'"') - - def test_get_dict_by_test(self): - with mock.patch('six.moves.builtins.open', mock.mock_open()), \ - mock.patch('yaml.safe_load') as mock_yaml: - mock_obj = mock.Mock() - testcase_dict = {'case_name': 'testname', - 'criteria': 50} - attrs = {'get.return_value': [{'testcases': [testcase_dict]}]} - mock_obj.configure_mock(**attrs) - mock_yaml.return_value = mock_obj - self.assertDictEqual( - run_tests.Runner.get_dict_by_test('testname'), - testcase_dict) - - @mock.patch('functest.ci.run_tests.Runner.get_run_dict', - return_value=None) - def test_run_tests_import_exception(self, *args): - mock_test = mock.Mock() - kwargs = {'get_name.return_value': 'test_name', - 'needs_clean.return_value': False} - mock_test.configure_mock(**kwargs) - with self.assertRaises(Exception) as context: - self.runner.run_test(mock_test) - args[0].assert_called_with('test_name') - msg = "Cannot import the class for the test case." - self.assertTrue(msg in str(context.exception)) - - @mock.patch('importlib.import_module', name="module", - return_value=mock.Mock(test_class=mock.Mock( - side_effect=FakeModule))) - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test') - def test_run_tests_default(self, *args): - mock_test = mock.Mock() - kwargs = {'get_name.return_value': 'test_name', - 'needs_clean.return_value': True} - mock_test.configure_mock(**kwargs) - test_run_dict = {'module': 'test_module', - 'class': 'test_class'} - with mock.patch('functest.ci.run_tests.Runner.get_run_dict', - return_value=test_run_dict): - self.runner.clean_flag = True - self.runner.run_test(mock_test) - args[0].assert_called_with('test_name') - args[1].assert_called_with('test_module') - self.assertEqual(self.runner.overall_result, - run_tests.Result.EX_OK) - - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - def test_run_tier_default(self, *mock_methods): - self.assertEqual(self.runner.run_tier(self.tier), - run_tests.Result.EX_OK) - mock_methods[0].assert_called_with(mock.ANY) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - def test_run_tier_missing_test(self, mock_logger_info): - self.tier.get_tests.return_value = None - self.assertEqual(self.runner.run_tier(self.tier), - run_tests.Result.EX_ERROR) - self.assertTrue(mock_logger_info.called) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - @mock.patch('functest.ci.run_tests.Runner.run_tier') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_run_all_default(self, *mock_methods): - os.environ['CI_LOOP'] = 'test_ci_loop' - self.runner.run_all() - mock_methods[1].assert_not_called() - self.assertTrue(mock_methods[2].called) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_run_all_missing_tier(self, *mock_methods): - os.environ['CI_LOOP'] = 'loop_re_not_available' - self.runner.run_all() - self.assertTrue(mock_methods[1].called) - - @mock.patch('functest.ci.run_tests.Runner.source_envfile', - side_effect=Exception) - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_failed(self, *mock_methods): - kwargs = {'test': 'test_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': False, - 'get_test.return_value': False} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_ERROR) - mock_methods[1].assert_called_once_with() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_tier(self, *mock_methods): - mock_tier = mock.Mock() - test_mock = mock.Mock() - test_mock.get_name.return_value = 'test1' - args = {'get_name.return_value': 'tier_name', - 'get_tests.return_value': [test_mock]} - mock_tier.configure_mock(**args) - kwargs = {'test': 'tier_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': mock_tier, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_OK) - mock_methods[1].assert_called() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - def test_main_test(self, *mock_methods): - kwargs = {'test': 'test_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': None, - 'get_test.return_value': 'test_name'} - self.runner.tiers = mock.Mock() - mock_methods[1].return_value = self.creds - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_OK) - mock_methods[0].assert_called_once_with('test_name') - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_all') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_all_tier(self, *args): - kwargs = {'get_tier.return_value': None, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**kwargs) - self.assertEqual( - self.runner.main(test='all', noclean=True, report=True), - run_tests.Result.EX_OK) - args[0].assert_called_once_with(None) - args[1].assert_called_once_with() - args[2].assert_called_once_with() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - def test_main_any_tier_test_ko(self, *args): - kwargs = {'get_tier.return_value': None, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**kwargs) - self.assertEqual( - self.runner.main(test='any', noclean=True, report=True), - run_tests.Result.EX_ERROR) - args[0].assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/ci/test_tier_builder.py b/functest/tests/unit/ci/test_tier_builder.py deleted file mode 100644 index ef6a007b..00000000 --- a/functest/tests/unit/ci/test_tier_builder.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.ci import tier_builder - - -class TierBuilderTesting(unittest.TestCase): - - def setUp(self): - self.dependency = { - 'installer': 'test_installer', 'scenario': 'test_scenario'} - self.testcase = { - 'dependencies': self.dependency, 'enabled': 'true', - 'case_name': 'test_name', 'criteria': 'test_criteria', - 'blocking': 'test_blocking', 'description': 'test_desc', - 'project_name': 'project_name'} - self.dic_tier = { - 'name': 'test_tier', 'order': 'test_order', - 'ci_loop': 'test_ci_loop', 'description': 'test_desc', - 'testcases': [self.testcase]} - self.mock_yaml = mock.Mock() - attrs = {'get.return_value': [self.dic_tier]} - self.mock_yaml.configure_mock(**attrs) - - with mock.patch('functest.ci.tier_builder.yaml.safe_load', - return_value=self.mock_yaml), \ - mock.patch('six.moves.builtins.open', mock.mock_open()): - self.tierbuilder = tier_builder.TierBuilder( - 'test_installer', 'test_scenario', 'testcases_file') - self.tier_obj = self.tierbuilder.tier_objects[0] - - def test_get_tiers(self): - self.assertEqual(self.tierbuilder.get_tiers(), - [self.tier_obj]) - - def test_get_tier_names(self): - self.assertEqual(self.tierbuilder.get_tier_names(), - ['test_tier']) - - def test_get_tier_present_tier(self): - self.assertEqual(self.tierbuilder.get_tier('test_tier'), - self.tier_obj) - - def test_get_tier_missing_tier(self): - self.assertEqual(self.tierbuilder.get_tier('test_tier2'), - None) - - def test_get_test_present_test(self): - self.assertEqual(self.tierbuilder.get_test('test_name'), - self.tier_obj.get_test('test_name')) - - def test_get_test_missing_test(self): - self.assertEqual(self.tierbuilder.get_test('test_name2'), - None) - - def test_get_tests_present_tier(self): - self.assertEqual(self.tierbuilder.get_tests('test_tier'), - self.tier_obj.tests_array) - - def test_get_tests_missing_tier(self): - self.assertEqual(self.tierbuilder.get_tests('test_tier2'), - None) - - def test_get_tier_name_ok(self): - self.assertEqual(self.tierbuilder.get_tier_name('test_name'), - 'test_tier') - - def test_get_tier_name_ko(self): - self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None) - - def test_str(self): - message = str(self.tierbuilder) - self.assertTrue('test_tier' in message) - self.assertTrue('test_order' in message) - self.assertTrue('test_ci_loop' in message) - self.assertTrue('test_desc' in message) - self.assertTrue('test_name' in message) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py deleted file mode 100644 index 5e784128..00000000 --- a/functest/tests/unit/ci/test_tier_handler.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.ci import tier_handler - - -class TierHandlerTesting(unittest.TestCase): - # pylint: disable=too-many-public-methods - - def setUp(self): - self.test = mock.Mock() - attrs = {'get_name.return_value': 'test_name'} - self.test.configure_mock(**attrs) - self.mock_depend = mock.Mock() - attrs = {'get_scenario.return_value': 'test_scenario', - 'get_installer.return_value': 'test_installer'} - self.mock_depend.configure_mock(**attrs) - self.tier = tier_handler.Tier( - 'test_tier', 'test_order', 'test_ci_loop', description='test_desc') - self.testcase = tier_handler.TestCase( - 'test_name', 'true', self.mock_depend, 'test_criteria', - True, description='test_desc', project='project_name') - self.dependency = tier_handler.Dependency( - 'test_installer', 'test_scenario') - self.testcase.str = self.testcase.__str__() - self.dependency.str = self.dependency.__str__() - self.tier.str = self.tier.__str__() - - def test_split_text(self): - test_str = 'this is for testing' - self.assertEqual(tier_handler.split_text(test_str, 10), - ['this is ', 'for ', 'testing ']) - - def test_add_test(self): - self.tier.add_test(self.test) - self.assertEqual(self.tier.tests_array, [self.test]) - - def test_get_skipped_test1(self): - self.assertEqual(self.tier.get_skipped_test(), []) - - def test_get_skipped_test2(self): - self.tier.skip_test(self.test) - self.assertEqual(self.tier.get_skipped_test(), [self.test]) - - def test_get_tests(self): - self.tier.tests_array = [self.test] - self.assertEqual(self.tier.get_tests(), [self.test]) - - def test_get_test_names(self): - self.tier.tests_array = [self.test] - self.assertEqual(self.tier.get_test_names(), ['test_name']) - - def test_get_test(self): - self.tier.tests_array = [self.test] - with mock.patch.object(self.tier, 'is_test', return_value=True): - self.assertEqual(self.tier.get_test('test_name'), self.test) - - def test_get_test_missing_test(self): - self.tier.tests_array = [self.test] - with mock.patch.object(self.tier, 'is_test', return_value=False): - self.assertEqual(self.tier.get_test('test_name'), None) - - def test_get_name(self): - self.assertEqual(self.tier.get_name(), 'test_tier') - - def test_get_order(self): - self.assertEqual(self.tier.get_order(), 'test_order') - - def test_get_ci_loop(self): - self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') - - def test_testcase_is_none_in_item(self): - self.assertEqual(tier_handler.TestCase.is_none("item"), False) - - def test_testcase_is_none_no_item(self): - self.assertEqual(tier_handler.TestCase.is_none(None), True) - - def test_testcase_is_compatible(self): - self.assertEqual( - self.testcase.is_compatible('test_installer', 'test_scenario'), - True) - - def test_testcase_is_compatible_2(self): - self.assertEqual( - self.testcase.is_compatible('missing_installer', 'test_scenario'), - False) - self.assertEqual( - self.testcase.is_compatible('test_installer', 'missing_scenario'), - False) - - @mock.patch('re.search', side_effect=TypeError) - def test_testcase_is_compatible3(self, *args): - self.assertEqual( - self.testcase.is_compatible('test_installer', 'test_scenario'), - False) - args[0].assert_called_once_with('test_installer', 'test_installer') - - def test_testcase_get_name(self): - self.assertEqual(self.tier.get_name(), 'test_tier') - - def test_testcase_is_enabled(self): - self.assertEqual(self.testcase.is_enabled(), 'true') - - def test_testcase_get_criteria(self): - self.assertEqual(self.testcase.get_criteria(), 'test_criteria') - - def test_testcase_is_blocking(self): - self.assertTrue(self.testcase.is_blocking()) - - def test_testcase_get_project(self): - self.assertEqual(self.testcase.get_project(), 'project_name') - - def test_testcase_get_order(self): - self.assertEqual(self.tier.get_order(), 'test_order') - - def test_testcase_get_ci_loop(self): - self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') - - def test_dependency_get_installer(self): - self.assertEqual(self.dependency.get_installer(), 'test_installer') - - def test_dependency_get_scenario(self): - self.assertEqual(self.dependency.get_scenario(), 'test_scenario') - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/__init__.py b/functest/tests/unit/core/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/unit/core/__init__.py +++ /dev/null diff --git a/functest/tests/unit/core/test_feature.py b/functest/tests/unit/core/test_feature.py deleted file mode 100644 index 3219c726..00000000 --- a/functest/tests/unit/core/test_feature.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.core import feature -from functest.core import testcase - - -class FeatureTestingBase(unittest.TestCase): - - _case_name = "foo" - _project_name = "bar" - _repo = "dir_repo_bar" - _cmd = "run_bar_tests.py" - _output_file = '/home/opnfv/functest/results/foo.log' - feature = None - - @mock.patch('time.time', side_effect=[1, 2]) - def _test_run(self, status, mock_method=None): - self.assertEqual(self.feature.run(cmd=self._cmd), status) - if status == testcase.TestCase.EX_OK: - self.assertEqual(self.feature.result, 100) - else: - self.assertEqual(self.feature.result, 0) - mock_method.assert_has_calls([mock.call(), mock.call()]) - self.assertEqual(self.feature.start_time, 1) - self.assertEqual(self.feature.stop_time, 2) - - def test_logger_module_ko(self): - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name) - self.assertEqual(self.feature.logger.name, self._case_name) - - def test_logger_module(self): - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name, - run={'module': 'bar'}) - self.assertEqual(self.feature.logger.name, 'bar') - - -class FeatureTesting(FeatureTestingBase): - - def setUp(self): - # logging must be disabled else it calls time.time() - # what will break these unit tests. - logging.disable(logging.CRITICAL) - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name) - - def test_run_exc(self): - # pylint: disable=bad-continuation - with mock.patch.object( - self.feature, 'execute', - side_effect=Exception) as mock_method: - self._test_run(testcase.TestCase.EX_RUN_ERROR) - mock_method.assert_called_once_with(cmd=self._cmd) - - def test_run(self): - self._test_run(testcase.TestCase.EX_RUN_ERROR) - - -class BashFeatureTesting(FeatureTestingBase): - - def setUp(self): - # logging must be disabled else it calls time.time() - # what will break these unit tests. - logging.disable(logging.CRITICAL) - with mock.patch('six.moves.builtins.open'): - self.feature = feature.BashFeature( - project_name=self._project_name, case_name=self._case_name) - - @mock.patch('subprocess.Popen') - def test_run_no_cmd(self, mock_subproc): - self.assertEqual( - self.feature.run(), testcase.TestCase.EX_RUN_ERROR) - mock_subproc.assert_not_called() - - @mock.patch('subprocess.Popen') - def test_run_ko(self, mock_subproc): - with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: - mock_obj = mock.Mock() - attrs = {'wait.return_value': 1} - mock_obj.configure_mock(**attrs) - - mock_subproc.return_value = mock_obj - self._test_run(testcase.TestCase.EX_RUN_ERROR) - mopen.assert_called_once_with(self._output_file, "w+") - - @mock.patch('subprocess.Popen') - def test_run(self, mock_subproc): - with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: - mock_obj = mock.Mock() - attrs = {'wait.return_value': 0} - mock_obj.configure_mock(**attrs) - - mock_subproc.return_value = mock_obj - self._test_run(testcase.TestCase.EX_OK) - mopen.assert_called_once_with(self._output_file, "w+") - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_robotframework.py b/functest/tests/unit/core/test_robotframework.py deleted file mode 100644 index 28fd15f6..00000000 --- a/functest/tests/unit/core/test_robotframework.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the classes required to fully cover robot.""" - -import errno -import logging -import os -import unittest - -import mock -from robot.errors import DataError, RobotError -from robot.result import model -from robot.utils.robottime import timestamp_to_secs - -from functest.core import robotframework - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class ResultVisitorTesting(unittest.TestCase): - - """The class testing ResultVisitor.""" - # pylint: disable=missing-docstring - - def setUp(self): - self.visitor = robotframework.ResultVisitor() - - def test_empty(self): - self.assertFalse(self.visitor.get_data()) - - def test_ok(self): - data = {'name': 'foo', - 'parent': 'bar', - 'status': 'PASS', - 'starttime': "20161216 16:00:00.000", - 'endtime': "20161216 16:00:01.000", - 'elapsedtime': 1000, - 'text': 'Hello, World!', - 'critical': True} - test = model.TestCase( - name=data['name'], status=data['status'], message=data['text'], - starttime=data['starttime'], endtime=data['endtime']) - test.parent = mock.Mock() - config = {'name': data['parent'], - 'criticality.test_is_critical.return_value': data[ - 'critical']} - test.parent.configure_mock(**config) - self.visitor.visit_test(test) - self.assertEqual(self.visitor.get_data(), [data]) - - -class ParseResultTesting(unittest.TestCase): - - """The class testing RobotFramework.parse_results().""" - # pylint: disable=missing-docstring - - _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000', - 'endtime': '20161216 16:00:01.000'} - - def setUp(self): - self.test = robotframework.RobotFramework( - case_name='robot', project_name='functest') - - @mock.patch('robot.api.ExecutionResult', side_effect=DataError) - def test_raises_exc(self, mock_method): - with self.assertRaises(DataError): - self.test.parse_results() - mock_method.assert_called_once_with( - os.path.join(self.test.res_dir, 'output.xml')) - - def _test_result(self, config, result): - suite = mock.Mock() - suite.configure_mock(**config) - with mock.patch('robot.api.ExecutionResult', - return_value=mock.Mock(suite=suite)): - self.test.parse_results() - self.assertEqual(self.test.result, result) - self.assertEqual(self.test.start_time, - timestamp_to_secs(config['starttime'])) - self.assertEqual(self.test.stop_time, - timestamp_to_secs(config['endtime'])) - self.assertEqual(self.test.details, - {'description': config['name'], 'tests': []}) - - def test_null_passed(self): - self._config.update({'statistics.critical.passed': 0, - 'statistics.critical.total': 20}) - self._test_result(self._config, 0) - - def test_no_test(self): - self._config.update({'statistics.critical.passed': 20, - 'statistics.critical.total': 0}) - self._test_result(self._config, 0) - - def test_half_success(self): - self._config.update({'statistics.critical.passed': 10, - 'statistics.critical.total': 20}) - self._test_result(self._config, 50) - - def test_success(self): - self._config.update({'statistics.critical.passed': 20, - 'statistics.critical.total': 20}) - self._test_result(self._config, 100) - - -class RunTesting(unittest.TestCase): - - """The class testing RobotFramework.run().""" - # pylint: disable=missing-docstring - - suites = ["foo"] - variable = [] - variablefile = [] - - def setUp(self): - self.test = robotframework.RobotFramework( - case_name='robot', project_name='functest') - - def test_exc_key_error(self): - self.assertEqual(self.test.run(), self.test.EX_RUN_ERROR) - - @mock.patch('robot.run') - def _test_makedirs_exc(self, *args): - with mock.patch.object(self.test, 'parse_results') as mock_method: - self.assertEqual( - self.test.run( - suites=self.suites, variable=self.variable, - variablefile=self.variablefile), - self.test.EX_RUN_ERROR) - args[0].assert_not_called() - mock_method.asser_not_called() - - @mock.patch('os.makedirs', side_effect=Exception) - def test_makedirs_exc(self, *args): - self._test_makedirs_exc() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('os.makedirs', side_effect=OSError) - def test_makedirs_oserror(self, *args): - self._test_makedirs_exc() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('robot.run') - def _test_makedirs(self, *args): - with mock.patch.object(self.test, 'parse_results') as mock_method: - self.assertEqual( - self.test.run(suites=self.suites, variable=self.variable), - self.test.EX_OK) - args[0].assert_called_once_with( - *self.suites, log='NONE', output=self.test.xml_file, - report='NONE', stdout=mock.ANY, variable=self.variable, - variablefile=self.variablefile) - mock_method.assert_called_once_with() - - @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, '')) - def test_makedirs_oserror17(self, *args): - self._test_makedirs() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('os.makedirs') - def test_makedirs(self, *args): - self._test_makedirs() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('robot.run') - def _test_parse_results(self, status, *args): - self.assertEqual( - self.test.run( - suites=self.suites, variable=self.variable, - variablefile=self.variablefile), - status) - args[0].assert_called_once_with( - *self.suites, log='NONE', output=self.test.xml_file, - report='NONE', stdout=mock.ANY, variable=self.variable, - variablefile=self.variablefile) - - def test_parse_results_exc(self): - with mock.patch.object(self.test, 'parse_results', - side_effect=Exception) as mock_method: - self._test_parse_results(self.test.EX_RUN_ERROR) - mock_method.assert_called_once_with() - - def test_parse_results_robot_error(self): - with mock.patch.object(self.test, 'parse_results', - side_effect=RobotError('foo')) as mock_method: - self._test_parse_results(self.test.EX_RUN_ERROR) - mock_method.assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_testcase.py b/functest/tests/unit/core/test_testcase.py deleted file mode 100644 index e11e5ff7..00000000 --- a/functest/tests/unit/core/test_testcase.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the class required to fully cover testcase.""" - -from datetime import datetime -import json -import logging -import os -import unittest - -from functest.core import testcase - -import mock -import requests - - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - - -class TestCaseTesting(unittest.TestCase): - """The class testing TestCase.""" - - # pylint: disable=missing-docstring,too-many-public-methods - - _case_name = "base" - _project_name = "functest" - _published_result = "PASS" - _test_db_url = "http://testresults.opnfv.org/test/api/v1/results" - _headers = {'Content-Type': 'application/json'} - - def setUp(self): - self.test = testcase.TestCase(case_name=self._case_name, - project_name=self._project_name) - self.test.start_time = 1 - self.test.stop_time = 2 - self.test.result = 100 - self.test.details = {"Hello": "World"} - os.environ['TEST_DB_URL'] = TestCaseTesting._test_db_url - os.environ['INSTALLER_TYPE'] = "installer_type" - os.environ['DEPLOY_SCENARIO'] = "scenario" - os.environ['NODE_NAME'] = "node_name" - os.environ['BUILD_TAG'] = "foo-daily-master-bar" - - def test_run_unimplemented(self): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_RUN_ERROR) - - def _test_pushdb_missing_attribute(self): - self.assertEqual(self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - - def test_pushdb_no_project_name(self): - self.test.project_name = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_case_name(self): - self.test.case_name = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_start_time(self): - self.test.start_time = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_stop_time(self): - self.test.stop_time = None - self._test_pushdb_missing_attribute() - - def _test_pushdb_missing_env(self, var): - del os.environ[var] - self.assertEqual(self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - - def test_pushdb_no_db_url(self): - self._test_pushdb_missing_env('TEST_DB_URL') - - def test_pushdb_no_installer_type(self): - self._test_pushdb_missing_env('INSTALLER_TYPE') - - def test_pushdb_no_deploy_scenario(self): - self._test_pushdb_missing_env('DEPLOY_SCENARIO') - - def test_pushdb_no_node_name(self): - self._test_pushdb_missing_env('NODE_NAME') - - def test_pushdb_no_build_tag(self): - self._test_pushdb_missing_env('BUILD_TAG') - - @mock.patch('requests.post') - def test_pushdb_bad_start_time(self, mock_function=None): - self.test.start_time = "1" - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_not_called() - - @mock.patch('requests.post') - def test_pushdb_bad_end_time(self, mock_function=None): - self.test.stop_time = "2" - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_not_called() - - def _get_data(self): - return { - "build_tag": os.environ['BUILD_TAG'], - "case_name": self._case_name, - "criteria": 'PASS' if self.test.is_successful( - ) == self.test.EX_OK else 'FAIL', - "details": self.test.details, - "installer": os.environ['INSTALLER_TYPE'], - "pod_name": os.environ['NODE_NAME'], - "project_name": self.test.project_name, - "scenario": os.environ['DEPLOY_SCENARIO'], - "start_date": datetime.fromtimestamp( - self.test.start_time).strftime('%Y-%m-%d %H:%M:%S'), - "stop_date": datetime.fromtimestamp( - self.test.stop_time).strftime('%Y-%m-%d %H:%M:%S'), - "version": "master"} - - @mock.patch('requests.post') - def _test_pushdb_version(self, mock_function=None, **kwargs): - payload = self._get_data() - payload["version"] = kwargs.get("version", "unknown") - self.assertEqual(self.test.push_to_db(), testcase.TestCase.EX_OK) - mock_function.assert_called_once_with( - os.environ['TEST_DB_URL'], - data=json.dumps(payload, sort_keys=True), - headers=self._headers) - - def test_pushdb_daily_job(self): - self._test_pushdb_version(version="master") - - def test_pushdb_weekly_job(self): - os.environ['BUILD_TAG'] = 'foo-weekly-master-bar' - self._test_pushdb_version(version="master") - - def test_pushdb_random_build_tag(self): - os.environ['BUILD_TAG'] = 'whatever' - self._test_pushdb_version(version="unknown") - - @mock.patch('requests.post', return_value=mock.Mock( - raise_for_status=mock.Mock( - side_effect=requests.exceptions.HTTPError))) - def test_pushdb_http_errors(self, mock_function=None): - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_called_once_with( - os.environ['TEST_DB_URL'], - data=json.dumps(self._get_data(), sort_keys=True), - headers=self._headers) - - def test_check_criteria_missing(self): - self.test.criteria = None - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_missing(self): - self.test.result = None - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_failed(self): - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - self.test.result = 'FAIL' - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_pass(self): - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - self.test.result = 'PASS' - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_lt(self): - self.test.result = 50 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_eq(self): - self.test.result = 100 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_gt(self): - self.test.criteria = 50 - self.test.result = 100 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_zero(self): - self.test.criteria = 0 - self.test.result = 0 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_get_duration_start_ko(self): - self.test.start_time = None - self.assertEqual(self.test.get_duration(), "XX:XX") - self.test.start_time = 0 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_duration_end_ko(self): - self.test.stop_time = None - self.assertEqual(self.test.get_duration(), "XX:XX") - self.test.stop_time = 0 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_invalid_duration(self): - self.test.start_time = 2 - self.test.stop_time = 1 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_zero_duration(self): - self.test.start_time = 2 - self.test.stop_time = 2 - self.assertEqual(self.test.get_duration(), "00:00") - - def test_get_duration(self): - self.test.start_time = 1 - self.test.stop_time = 180 - self.assertEqual(self.test.get_duration(), "02:59") - - def test_str_project_name_ko(self): - self.test.project_name = None - self.assertIn("<functest.core.testcase.TestCase object at", - str(self.test)) - - def test_str_case_name_ko(self): - self.test.case_name = None - self.assertIn("<functest.core.testcase.TestCase object at", - str(self.test)) - - def test_str_pass(self): - duration = '01:01' - with mock.patch.object(self.test, 'get_duration', - return_value=duration), \ - mock.patch.object(self.test, 'is_successful', - return_value=testcase.TestCase.EX_OK): - message = str(self.test) - self.assertIn(self._project_name, message) - self.assertIn(self._case_name, message) - self.assertIn(duration, message) - self.assertIn('PASS', message) - - def test_str_fail(self): - duration = '00:59' - with mock.patch.object(self.test, 'get_duration', - return_value=duration), \ - mock.patch.object( - self.test, 'is_successful', - return_value=testcase.TestCase.EX_TESTCASE_FAILED): - message = str(self.test) - self.assertIn(self._project_name, message) - self.assertIn(self._case_name, message) - self.assertIn(duration, message) - self.assertIn('FAIL', message) - - def test_clean(self): - self.assertEqual(self.test.clean(), None) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_unit.py b/functest/tests/unit/core/test_unit.py deleted file mode 100644 index ca73de67..00000000 --- a/functest/tests/unit/core/test_unit.py +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.core import unit -from functest.core import testcase - - -class PyTestSuiteRunnerTesting(unittest.TestCase): - - def setUp(self): - self.psrunner = unit.Suite() - self.psrunner.suite = "foo" - - @mock.patch('unittest.TestLoader') - def _test_run(self, mock_class=None, result=mock.Mock(), - status=testcase.TestCase.EX_OK): - with mock.patch('functest.core.unit.unittest.TextTestRunner.run', - return_value=result): - self.assertEqual(self.psrunner.run(), status) - mock_class.assert_not_called() - - def test_check_suite_null(self): - self.assertEqual(unit.Suite().suite, None) - self.psrunner.suite = None - self._test_run(result=mock.Mock(), - status=testcase.TestCase.EX_RUN_ERROR) - - def test_run_no_ut(self): - mock_result = mock.Mock(testsRun=0, errors=[], failures=[]) - self._test_run(result=mock_result, - status=testcase.TestCase.EX_RUN_ERROR) - self.assertEqual(self.psrunner.result, 0) - self.assertEqual(self.psrunner.details, - {'errors': 0, 'failures': 0, 'stream': '', - 'testsRun': 0}) - self.assertEqual(self.psrunner.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_result_ko(self): - self.psrunner.criteria = 100 - mock_result = mock.Mock(testsRun=50, errors=[('test1', 'error_msg1')], - failures=[('test2', 'failure_msg1')]) - self._test_run(result=mock_result) - self.assertEqual(self.psrunner.result, 96) - self.assertEqual(self.psrunner.details, - {'errors': 1, 'failures': 1, 'stream': '', - 'testsRun': 50}) - self.assertEqual(self.psrunner.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_result_ok(self): - mock_result = mock.Mock(testsRun=50, errors=[], - failures=[]) - self._test_run(result=mock_result) - self.assertEqual(self.psrunner.result, 100) - self.assertEqual(self.psrunner.details, - {'errors': 0, 'failures': 0, 'stream': '', - 'testsRun': 50}) - self.assertEqual(self.psrunner.is_successful(), - testcase.TestCase.EX_OK) - - @mock.patch('unittest.TestLoader') - def test_run_name_exc(self, mock_class=None): - mock_obj = mock.Mock(side_effect=ImportError) - mock_class.side_effect = mock_obj - self.assertEqual(self.psrunner.run(name='foo'), - testcase.TestCase.EX_RUN_ERROR) - mock_class.assert_called_once_with() - mock_obj.assert_called_once_with() - - @mock.patch('unittest.TestLoader') - def test_run_name(self, mock_class=None): - mock_result = mock.Mock(testsRun=50, errors=[], - failures=[]) - mock_obj = mock.Mock() - mock_class.side_effect = mock_obj - with mock.patch('functest.core.unit.unittest.TextTestRunner.run', - return_value=mock_result): - self.assertEqual(self.psrunner.run(name='foo'), - testcase.TestCase.EX_OK) - mock_class.assert_called_once_with() - mock_obj.assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py deleted file mode 100644 index 0ac672f6..00000000 --- a/functest/tests/unit/core/test_vnf.py +++ /dev/null @@ -1,187 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.core import vnf -from functest.core import testcase -from functest.utils import constants - -from snaps.openstack.os_credentials import OSCreds - - -class VnfBaseTesting(unittest.TestCase): - """The class testing VNF.""" - # pylint: disable=missing-docstring,too-many-public-methods - - tenant_name = 'test_tenant_name' - tenant_description = 'description' - - def setUp(self): - self.test = vnf.VnfOnBoarding(project='functest', case_name='foo') - - def test_run_deploy_orch_exc(self): - with mock.patch.object(self.test, 'prepare'), \ - mock.patch.object(self.test, 'deploy_orchestrator', - side_effect=Exception) as mock_method, \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - mock_method.assert_called_with() - - def test_run_deploy_vnf_exc(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - side_effect=Exception) as mock_method: - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - mock_method.assert_called_with() - - def test_run_test_vnf_exc(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - side_effect=Exception) as mock_method: - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - mock_method.assert_called_with() - - def test_run_deploy_orch_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=False), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_vnf_deploy_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=False), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_vnf_test_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=False): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_default(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), testcase.TestCase.EX_OK) - - @mock.patch('functest.core.vnf.OpenStackUser') - @mock.patch('functest.core.vnf.OpenStackProject') - @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials', - side_effect=Exception) - def test_prepare_exc1(self, *args): - with self.assertRaises(Exception): - self.test.prepare() - args[0].assert_called_with(os_env_file=constants.ENV_FILE) - args[1].assert_not_called() - args[2].assert_not_called() - - @mock.patch('functest.core.vnf.OpenStackUser') - @mock.patch('functest.core.vnf.OpenStackProject', side_effect=Exception) - @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials') - def test_prepare_exc2(self, *args): - with self.assertRaises(Exception): - self.test.prepare() - args[0].assert_called_with(os_env_file=constants.ENV_FILE) - args[1].assert_called_with(mock.ANY, mock.ANY) - args[2].assert_not_called() - - @mock.patch('functest.core.vnf.OpenStackUser', side_effect=Exception) - @mock.patch('functest.core.vnf.OpenStackProject') - @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials') - def test_prepare_exc3(self, *args): - with self.assertRaises(Exception): - self.test.prepare() - args[0].assert_called_with(os_env_file=constants.ENV_FILE) - args[1].assert_called_with(mock.ANY, mock.ANY) - args[2].assert_called_with(mock.ANY, mock.ANY) - - @mock.patch('functest.core.vnf.OpenStackUser') - @mock.patch('functest.core.vnf.OpenStackProject') - @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials') - def test_prepare_default(self, *args): - self.assertEqual(self.test.prepare(), testcase.TestCase.EX_OK) - args[0].assert_called_with(os_env_file=constants.ENV_FILE) - args[1].assert_called_with(mock.ANY, mock.ANY) - args[2].assert_called_with(mock.ANY, mock.ANY) - - def test_deploy_vnf_unimplemented(self): - with self.assertRaises(vnf.VnfDeploymentException): - self.test.deploy_vnf() - - def test_test_vnf_unimplemented(self): - with self.assertRaises(vnf.VnfTestException): - self.test.test_vnf() - - def test_deploy_orch_unimplemented(self): - self.assertTrue(self.test.deploy_orchestrator()) - - @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials', - return_value=OSCreds( - username='user', password='pass', - auth_url='http://foo.com:5000/v3', project_name='bar'), - side_effect=Exception) - def test_prepare_keystone_client_ko(self, *args): - with self.assertRaises(vnf.VnfPreparationException): - self.test.prepare() - args[0].assert_called_once() - - def test_vnf_clean_exc(self): - obj = mock.Mock() - obj.clean.side_effect = Exception - self.test.created_object = [obj] - self.test.clean() - obj.clean.assert_called_with() - - def test_vnf_clean(self): - obj = mock.Mock() - self.test.created_object = [obj] - self.test.clean() - obj.clean.assert_called_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/energy/__init__.py b/functest/tests/unit/energy/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/unit/energy/__init__.py +++ /dev/null diff --git a/functest/tests/unit/energy/test_functest_energy.py b/functest/tests/unit/energy/test_functest_energy.py deleted file mode 100644 index fd110432..00000000 --- a/functest/tests/unit/energy/test_functest_energy.py +++ /dev/null @@ -1,371 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -*- - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Unitary test for energy module.""" -# pylint: disable=unused-argument -import logging -import os -import unittest - -import mock -import requests - -from functest.energy.energy import EnergyRecorder -import functest.energy.energy as energy - -CASE_NAME = "UNIT_TEST_CASE" -STEP_NAME = "UNIT_TEST_STEP" - -PREVIOUS_SCENARIO = "previous_scenario" -PREVIOUS_STEP = "previous_step" - - -class MockHttpResponse(object): # pylint: disable=too-few-public-methods - """Mock response for Energy recorder API.""" - - def __init__(self, text, status_code): - """Create an instance of MockHttpResponse.""" - self.text = text - self.status_code = status_code - - -API_OK = MockHttpResponse( - '{"status": "OK"}', - 200 -) -API_KO = MockHttpResponse( - '{"message": "API-KO"}', - 500 -) - -RECORDER_OK = MockHttpResponse( - '{"environment": "UNIT_TEST",' - ' "step": "string",' - ' "scenario": "' + CASE_NAME + '"}', - 200 -) -RECORDER_KO = MockHttpResponse( - '{"message": "An unhandled API exception occurred (MOCK)"}', - 500 -) -RECORDER_NOT_FOUND = MockHttpResponse( - '{"message": "Recorder not found (MOCK)"}', - 404 -) - - -# pylint: disable=too-many-public-methods -class EnergyRecorderTest(unittest.TestCase): - """Energy module unitary test suite.""" - - case_name = CASE_NAME - request_headers = {'content-type': 'application/json'} - returned_value_to_preserve = "value" - exception_message_to_preserve = "exception_message" - - @staticmethod - def _set_env_creds(): - """Set config values.""" - os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888" - os.environ["ENERGY_RECORDER_API_USER"] = "user" - os.environ["ENERGY_RECORDER_API_PASSWORD"] = "password" - - @staticmethod - def _set_env_nocreds(): - """Set config values.""" - os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888" - del os.environ["ENERGY_RECORDER_API_USER"] - del os.environ["ENERGY_RECORDER_API_PASSWORD"] - - @mock.patch('functest.energy.energy.requests.post', - return_value=RECORDER_OK) - def test_start(self, post_mock=None, get_mock=None): - """EnergyRecorder.start method (regular case).""" - self.test_load_config() - self.assertTrue(EnergyRecorder.start(self.case_name)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.post', - side_effect=Exception("Internal execution error (MOCK)")) - def test_start_error(self, post_mock=None): - """EnergyRecorder.start method (error in method).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.start(self.case_name)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.EnergyRecorder.load_config', - side_effect=Exception("Internal execution error (MOCK)")) - def test_start_exception(self, conf_loader_mock=None): - """EnergyRecorder.start test with exception during execution.""" - start_status = EnergyRecorder.start(CASE_NAME) - self.assertFalse(start_status) - - @mock.patch('functest.energy.energy.requests.post', - return_value=RECORDER_KO) - def test_start_api_error(self, post_mock=None): - """EnergyRecorder.start method (API error).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.start(self.case_name)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.post', - return_value=RECORDER_OK) - def test_set_step(self, post_mock=None): - """EnergyRecorder.set_step method (regular case).""" - self.test_load_config() - self.assertTrue(EnergyRecorder.set_step(STEP_NAME)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"] + "/step", - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.post', - return_value=RECORDER_KO) - def test_set_step_api_error(self, post_mock=None): - """EnergyRecorder.set_step method (API error).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.set_step(STEP_NAME)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"] + "/step", - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.post', - side_effect=Exception("Internal execution error (MOCK)")) - def test_set_step_error(self, post_mock=None): - """EnergyRecorder.set_step method (method error).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.set_step(STEP_NAME)) - post_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"] + "/step", - auth=EnergyRecorder.energy_recorder_api["auth"], - data=mock.ANY, - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.EnergyRecorder.load_config', - side_effect=requests.exceptions.ConnectionError()) - def test_set_step_connection_error(self, conf_loader_mock=None): - """EnergyRecorder.start test with exception during execution.""" - step_status = EnergyRecorder.set_step(STEP_NAME) - self.assertFalse(step_status) - - @mock.patch('functest.energy.energy.requests.delete', - return_value=RECORDER_OK) - def test_stop(self, delete_mock=None): - """EnergyRecorder.stop method (regular case).""" - self.test_load_config() - self.assertTrue(EnergyRecorder.stop()) - delete_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.delete', - return_value=RECORDER_KO) - def test_stop_api_error(self, delete_mock=None): - """EnergyRecorder.stop method (API Error).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.stop()) - delete_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @mock.patch('functest.energy.energy.requests.delete', - side_effect=Exception("Internal execution error (MOCK)")) - def test_stop_error(self, delete_mock=None): - """EnergyRecorder.stop method (method error).""" - self.test_load_config() - self.assertFalse(EnergyRecorder.stop()) - delete_mock.assert_called_once_with( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - headers=self.request_headers, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - - @energy.enable_recording - def __decorated_method(self): - """Call with to energy recorder decorators.""" - return self.returned_value_to_preserve - - @energy.enable_recording - def __decorated_method_with_ex(self): - """Call with to energy recorder decorators.""" - raise Exception(self.exception_message_to_preserve) - - @mock.patch("functest.energy.energy.EnergyRecorder.get_current_scenario", - return_value=None) - @mock.patch("functest.energy.energy.EnergyRecorder") - def test_decorators(self, - recorder_mock=None, - cur_scenario_mock=None): - """Test energy module decorators.""" - self.__decorated_method() - calls = [mock.call.start(self.case_name), - mock.call.stop()] - recorder_mock.assert_has_calls(calls) - - @mock.patch("functest.energy.energy.EnergyRecorder.get_current_scenario", - return_value={"scenario": PREVIOUS_SCENARIO, - "step": PREVIOUS_STEP}) - @mock.patch("functest.energy.energy.EnergyRecorder") - def test_decorators_with_previous(self, - recorder_mock=None, - cur_scenario_mock=None): - """Test energy module decorators.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self._set_env_creds() - self.__decorated_method() - calls = [mock.call.start(self.case_name), - mock.call.submit_scenario(PREVIOUS_SCENARIO, - PREVIOUS_STEP)] - recorder_mock.assert_has_calls(calls, True) - - def test_decorator_preserve_return(self): - """Test that decorator preserve method returned value.""" - self.test_load_config() - self.assertTrue( - self.__decorated_method() == self.returned_value_to_preserve - ) - - @mock.patch( - "functest.energy.energy.finish_session") - def test_decorator_preserve_ex(self, finish_mock=None): - """Test that decorator preserve method exceptions.""" - self.test_load_config() - with self.assertRaises(Exception) as context: - self.__decorated_method_with_ex() - self.assertTrue( - self.exception_message_to_preserve in str(context.exception) - ) - self.assertTrue(finish_mock.called) - - @mock.patch("functest.energy.energy.requests.get", - return_value=API_OK) - def test_load_config(self, loader_mock=None, get_mock=None): - """Test load config.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self._set_env_creds() - EnergyRecorder.energy_recorder_api = None - EnergyRecorder.load_config() - - self.assertEquals( - EnergyRecorder.energy_recorder_api["auth"], - ("user", "password") - ) - self.assertEquals( - EnergyRecorder.energy_recorder_api["uri"], - "http://pod-uri:8888/recorders/environment/MOCK_POD" - ) - - @mock.patch("functest.energy.energy.requests.get", - return_value=API_OK) - def test_load_config_no_creds(self, loader_mock=None, get_mock=None): - """Test load config without creds.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self._set_env_nocreds() - EnergyRecorder.energy_recorder_api = None - EnergyRecorder.load_config() - self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None) - self.assertEquals( - EnergyRecorder.energy_recorder_api["uri"], - "http://pod-uri:8888/recorders/environment/MOCK_POD" - ) - - @mock.patch("functest.energy.energy.requests.get", - return_value=API_OK) - def test_load_config_ex(self, loader_mock=None, get_mock=None): - """Test load config with exception.""" - for key in ['NODE_NAME', 'ENERGY_RECORDER_API_URL']: - os.environ[key] = '' - with self.assertRaises(AssertionError): - EnergyRecorder.energy_recorder_api = None - EnergyRecorder.load_config() - self.assertEquals(EnergyRecorder.energy_recorder_api, None) - - @mock.patch("functest.energy.energy.requests.get", - return_value=API_KO) - def test_load_config_api_ko(self, loader_mock=None, get_mock=None): - """Test load config with API unavailable.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self._set_env_creds() - EnergyRecorder.energy_recorder_api = None - EnergyRecorder.load_config() - self.assertEquals(EnergyRecorder.energy_recorder_api["available"], - False) - - @mock.patch('functest.energy.energy.requests.get', - return_value=RECORDER_OK) - def test_get_current_scenario(self, loader_mock=None, get_mock=None): - """Test get_current_scenario.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self.test_load_config() - scenario = EnergyRecorder.get_current_scenario() - self.assertTrue(scenario is not None) - - @mock.patch('functest.energy.energy.requests.get', - return_value=RECORDER_NOT_FOUND) - def test_current_scenario_not_found(self, get_mock=None): - """Test get current scenario not existing.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self.test_load_config() - scenario = EnergyRecorder.get_current_scenario() - self.assertTrue(scenario is None) - - @mock.patch('functest.energy.energy.requests.get', - return_value=RECORDER_KO) - def test_current_scenario_api_error(self, get_mock=None): - """Test get current scenario with API error.""" - os.environ['NODE_NAME'] = 'MOCK_POD' - self.test_load_config() - scenario = EnergyRecorder.get_current_scenario() - self.assertTrue(scenario is None) - - @mock.patch('functest.energy.energy.EnergyRecorder.load_config', - side_effect=Exception("Internal execution error (MOCK)")) - def test_current_scenario_exception(self, get_mock=None): - """Test get current scenario with exception.""" - scenario = EnergyRecorder.get_current_scenario() - self.assertTrue(scenario is None) - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/utils/__init__.py b/functest/tests/unit/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/tests/unit/utils/__init__.py +++ /dev/null diff --git a/functest/tests/unit/utils/test_decorators.py b/functest/tests/unit/utils/test_decorators.py deleted file mode 100644 index b4cdf6ff..00000000 --- a/functest/tests/unit/utils/test_decorators.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the class required to fully cover decorators.""" - -from datetime import datetime -import errno -import json -import logging -import os -import unittest - -import mock - -from functest.core import testcase -from functest.utils import decorators - -__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>" - -DIR = '/dev' -FILE = '{}/null'.format(DIR) -URL = 'file://{}'.format(FILE) - - -class DecoratorsTesting(unittest.TestCase): - # pylint: disable=missing-docstring - - _case_name = 'base' - _project_name = 'functest' - _start_time = 1.0 - _stop_time = 2.0 - _result = 'PASS' - _version = 'unknown' - _build_tag = 'none' - _node_name = 'bar' - _deploy_scenario = 'foo' - _installer_type = 'debian' - - def setUp(self): - os.environ['INSTALLER_TYPE'] = self._installer_type - os.environ['DEPLOY_SCENARIO'] = self._deploy_scenario - os.environ['NODE_NAME'] = self._node_name - os.environ['BUILD_TAG'] = self._build_tag - - def test_wraps(self): - self.assertEqual(testcase.TestCase.push_to_db.__name__, - "push_to_db") - - def _get_json(self): - stop_time = datetime.fromtimestamp(self._stop_time).strftime( - '%Y-%m-%d %H:%M:%S') - start_time = datetime.fromtimestamp(self._start_time).strftime( - '%Y-%m-%d %H:%M:%S') - data = {'project_name': self._project_name, - 'stop_date': stop_time, 'start_date': start_time, - 'case_name': self._case_name, 'build_tag': self._build_tag, - 'pod_name': self._node_name, 'installer': self._installer_type, - 'scenario': self._deploy_scenario, 'version': self._version, - 'details': {}, 'criteria': self._result} - return json.dumps(data, sort_keys=True) - - def _get_testcase(self): - test = testcase.TestCase( - project_name=self._project_name, case_name=self._case_name) - test.start_time = self._start_time - test.stop_time = self._stop_time - test.result = 100 - test.details = {} - return test - - @mock.patch('requests.post') - def test_http_shema(self, *args): - os.environ['TEST_DB_URL'] = 'http://127.0.0.1' - test = self._get_testcase() - self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK) - args[0].assert_called_once_with( - 'http://127.0.0.1', data=self._get_json(), - headers={'Content-Type': 'application/json'}) - - def test_wrong_shema(self): - os.environ['TEST_DB_URL'] = '/dev/null' - test = self._get_testcase() - self.assertEqual( - test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR) - - def _test_dump(self): - os.environ['TEST_DB_URL'] = URL - with mock.patch.object(decorators, 'open', mock.mock_open(), - create=True) as mock_open: - test = self._get_testcase() - self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK) - mock_open.assert_called_once_with(FILE, 'a') - handle = mock_open() - call_args, _ = handle.write.call_args - self.assertIn('POST', call_args[0]) - self.assertIn(self._get_json(), call_args[0]) - - @mock.patch('os.makedirs') - def test_default_dump(self, mock_method=None): - self._test_dump() - mock_method.assert_called_once_with(DIR) - - @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, '')) - def test_makedirs_dir_exists(self, mock_method=None): - self._test_dump() - mock_method.assert_called_once_with(DIR) - - @mock.patch('os.makedirs', side_effect=OSError) - def test_makedirs_exc(self, *args): - os.environ['TEST_DB_URL'] = URL - test = self._get_testcase() - self.assertEqual( - test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR) - args[0].assert_called_once_with(DIR) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/utils/test_env.py b/functest/tests/unit/utils/test_env.py deleted file mode 100644 index 49d2d974..00000000 --- a/functest/tests/unit/utils/test_env.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2018 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import os -import unittest - -from six.moves import reload_module - -from functest.utils import env - - -class EnvTesting(unittest.TestCase): - # pylint: disable=missing-docstring - - def setUp(self): - os.environ['FOO'] = 'foo' - os.environ['BUILD_TAG'] = 'master' - os.environ['CI_LOOP'] = 'weekly' - - def test_get_unset_unknown_env(self): - del os.environ['FOO'] - self.assertEqual(env.get('FOO'), None) - - def test_get_unknown_env(self): - self.assertEqual(env.get('FOO'), 'foo') - reload_module(env) - - def test_get_unset_env(self): - del os.environ['CI_LOOP'] - self.assertEqual( - env.get('CI_LOOP'), env.INPUTS['CI_LOOP']) - - def test_get_env(self): - self.assertEqual( - env.get('CI_LOOP'), 'weekly') - - def test_get_unset_env2(self): - del os.environ['BUILD_TAG'] - self.assertEqual( - env.get('BUILD_TAG'), env.INPUTS['BUILD_TAG']) - - def test_get_env2(self): - self.assertEqual(env.get('BUILD_TAG'), 'master') - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/utils/__init__.py b/functest/utils/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/functest/utils/__init__.py +++ /dev/null diff --git a/functest/utils/constants.py b/functest/utils/constants.py deleted file mode 100644 index 0bc00d80..00000000 --- a/functest/utils/constants.py +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env python - -# pylint: disable=missing-docstring - -import pkg_resources - -CONFIG_FUNCTEST_YAML = pkg_resources.resource_filename( - 'functest', 'ci/config_functest.yaml') - -ENV_FILE = '/home/opnfv/functest/conf/env_file' diff --git a/functest/utils/decorators.py b/functest/utils/decorators.py deleted file mode 100644 index 230a99e7..00000000 --- a/functest/utils/decorators.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import errno -import functools -import os - -import mock -import requests.sessions -from six.moves import urllib - - -def can_dump_request_to_file(method): - - def dump_preparedrequest(request, **kwargs): - # pylint: disable=unused-argument - parseresult = urllib.parse.urlparse(request.url) - if parseresult.scheme == "file": - try: - dirname = os.path.dirname(parseresult.path) - os.makedirs(dirname) - except OSError as ex: - if ex.errno != errno.EEXIST: - raise - with open(parseresult.path, 'a') as dumpfile: - headers = "" - for key in request.headers: - headers += key + " " + request.headers[key] + "\n" - message = "{} {}\n{}\n{}\n\n\n".format( - request.method, request.url, headers, request.body) - dumpfile.write(message) - return mock.Mock() - - def patch_request(method, url, **kwargs): - with requests.sessions.Session() as session: - parseresult = urllib.parse.urlparse(url) - if parseresult.scheme == "file": - with mock.patch.object(session, 'send', - side_effect=dump_preparedrequest): - return session.request(method=method, url=url, **kwargs) - else: - return session.request(method=method, url=url, **kwargs) - - @functools.wraps(method) - def hook(*args, **kwargs): - with mock.patch('requests.api.request', side_effect=patch_request): - return method(*args, **kwargs) - - return hook diff --git a/functest/utils/env.py b/functest/utils/env.py deleted file mode 100644 index aa2da0b5..00000000 --- a/functest/utils/env.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2018 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import os - -import prettytable - -INPUTS = { - 'EXTERNAL_NETWORK': None, - 'CI_LOOP': 'daily', - 'DEPLOY_SCENARIO': 'os-nosdn-nofeature-noha', - 'INSTALLER_TYPE': None, - 'SDN_CONTROLLER_IP': None, - 'BUILD_TAG': None, - 'NODE_NAME': None, - 'POD_ARCH': None, - 'TEST_DB_URL': 'http://testresults.opnfv.org/test/api/v1/results', - 'ENERGY_RECORDER_API_URL': 'http://energy.opnfv.fr/resources', - 'ENERGY_RECORDER_API_USER': None, - 'ENERGY_RECORDER_API_PASSWORD': None -} - - -def get(env_var): - if env_var not in INPUTS.keys(): - return os.environ.get(env_var, None) - return os.environ.get(env_var, INPUTS[env_var]) - - -def string(): - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['env var', 'value']) - for env_var in INPUTS: - msg.add_row([env_var, get(env_var) if get(env_var) else '']) - return msg |