From 2aab5c48df64b044ab9bae6e883e6e0acaabbf52 Mon Sep 17 00:00:00 2001 From: Cédric Ollivier Date: Wed, 28 Feb 2018 09:35:49 +0100 Subject: Rename all Functest refs to Xtesting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It mainly renames python modules and then the related documentation config files. Change-Id: I186010bb88d3d39afe7b8fd1ebcef9c690cc1282 Signed-off-by: Cédric Ollivier --- functest/__init__.py | 0 functest/ci/__init__.py | 0 functest/ci/logging.ini | 80 ---- functest/ci/run_tests.py | 302 --------------- functest/ci/testcases.yaml | 424 --------------------- functest/ci/tier_builder.py | 106 ------ functest/ci/tier_handler.py | 174 --------- functest/core/__init__.py | 0 functest/core/feature.py | 133 ------- functest/core/robotframework.py | 126 ------ functest/core/testcase.py | 227 ----------- functest/core/unit.py | 92 ----- functest/core/vnf.py | 205 ---------- functest/energy/__init__.py | 0 functest/energy/energy.py | 336 ---------------- functest/tests/__init__.py | 0 functest/tests/unit/__init__.py | 0 functest/tests/unit/ci/__init__.py | 0 functest/tests/unit/ci/test_run_tests.py | 267 ------------- functest/tests/unit/ci/test_tier_builder.py | 93 ----- functest/tests/unit/ci/test_tier_handler.py | 139 ------- functest/tests/unit/core/__init__.py | 0 functest/tests/unit/core/test_feature.py | 117 ------ functest/tests/unit/core/test_robotframework.py | 199 ---------- functest/tests/unit/core/test_testcase.py | 277 -------------- functest/tests/unit/core/test_unit.py | 98 ----- functest/tests/unit/core/test_vnf.py | 187 --------- functest/tests/unit/energy/__init__.py | 0 functest/tests/unit/energy/test_functest_energy.py | 371 ------------------ functest/tests/unit/utils/__init__.py | 0 functest/tests/unit/utils/test_decorators.py | 125 ------ functest/tests/unit/utils/test_env.py | 57 --- functest/utils/__init__.py | 0 functest/utils/constants.py | 10 - functest/utils/decorators.py | 57 --- functest/utils/env.py | 44 --- 36 files changed, 4246 deletions(-) delete mode 100644 functest/__init__.py delete mode 100644 functest/ci/__init__.py delete mode 100644 functest/ci/logging.ini delete mode 100644 functest/ci/run_tests.py delete mode 100644 functest/ci/testcases.yaml delete mode 100644 functest/ci/tier_builder.py delete mode 100644 functest/ci/tier_handler.py delete mode 100644 functest/core/__init__.py delete mode 100644 functest/core/feature.py delete mode 100644 functest/core/robotframework.py delete mode 100644 functest/core/testcase.py delete mode 100644 functest/core/unit.py delete mode 100644 functest/core/vnf.py delete mode 100644 functest/energy/__init__.py delete mode 100644 functest/energy/energy.py delete mode 100644 functest/tests/__init__.py delete mode 100644 functest/tests/unit/__init__.py delete mode 100644 functest/tests/unit/ci/__init__.py delete mode 100644 functest/tests/unit/ci/test_run_tests.py delete mode 100644 functest/tests/unit/ci/test_tier_builder.py delete mode 100644 functest/tests/unit/ci/test_tier_handler.py delete mode 100644 functest/tests/unit/core/__init__.py delete mode 100644 functest/tests/unit/core/test_feature.py delete mode 100644 functest/tests/unit/core/test_robotframework.py delete mode 100644 functest/tests/unit/core/test_testcase.py delete mode 100644 functest/tests/unit/core/test_unit.py delete mode 100644 functest/tests/unit/core/test_vnf.py delete mode 100644 functest/tests/unit/energy/__init__.py delete mode 100644 functest/tests/unit/energy/test_functest_energy.py delete mode 100644 functest/tests/unit/utils/__init__.py delete mode 100644 functest/tests/unit/utils/test_decorators.py delete mode 100644 functest/tests/unit/utils/test_env.py delete mode 100644 functest/utils/__init__.py delete mode 100644 functest/utils/constants.py delete mode 100644 functest/utils/decorators.py delete mode 100644 functest/utils/env.py (limited to 'functest') diff --git a/functest/__init__.py b/functest/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/ci/__init__.py b/functest/ci/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/ci/logging.ini b/functest/ci/logging.ini deleted file mode 100644 index f1ab7241..00000000 --- a/functest/ci/logging.ini +++ /dev/null @@ -1,80 +0,0 @@ -[loggers] -keys=root,functest,api,ci,cli,core,energy,opnfv_tests,utils - -[handlers] -keys=console,wconsole,file,null - -[formatters] -keys=standard - -[logger_root] -level=NOTSET -handlers=null - -[logger_functest] -level=NOTSET -handlers=file -qualname=functest - -[logger_api] -level=NOTSET -handlers=wconsole -qualname=functest.api - -[logger_ci] -level=NOTSET -handlers=console -qualname=functest.ci - -[logger_cli] -level=NOTSET -handlers=wconsole -qualname=functest.cli - -[logger_core] -level=NOTSET -handlers=console -qualname=functest.core - -[logger_energy] -level=NOTSET -handlers=wconsole -qualname=functest.energy - -[logger_opnfv_tests] -level=NOTSET -handlers=wconsole -qualname=functest.opnfv_tests - -[logger_utils] -level=NOTSET -handlers=wconsole -qualname=functest.utils - -[handler_null] -class=NullHandler -level=NOTSET -formatter=standard -args=() - -[handler_console] -class=StreamHandler -level=INFO -formatter=standard -args=(sys.stdout,) - -[handler_wconsole] -class=StreamHandler -level=WARN -formatter=standard -args=(sys.stdout,) - -[handler_file] -class=FileHandler -level=DEBUG -formatter=standard -args=("/home/opnfv/functest/results/functest.log",) - -[formatter_standard] -format=%(asctime)s - %(name)s - %(levelname)s - %(message)s -datefmt= diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py deleted file mode 100644 index 651a3851..00000000 --- a/functest/ci/run_tests.py +++ /dev/null @@ -1,302 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -""" The entry of running tests: -1) Parses functest/ci/testcases.yaml to check which testcase(s) to be run -2) Execute the common operations on every testcase (run, push results to db...) -3) Return the right status code -""" - -import argparse -import importlib -import logging -import logging.config -import os -import re -import sys -import textwrap -import pkg_resources - -import enum -import prettytable -import yaml - -from functest.ci import tier_builder -from functest.core import testcase -from functest.utils import constants -from functest.utils import env - -LOGGER = logging.getLogger('functest.ci.run_tests') - - -class Result(enum.Enum): - """The overall result in enumerated type""" - # pylint: disable=too-few-public-methods - EX_OK = os.EX_OK - EX_ERROR = -1 - - -class BlockingTestFailed(Exception): - """Exception when the blocking test fails""" - pass - - -class TestNotEnabled(Exception): - """Exception when the test is not enabled""" - pass - - -class RunTestsParser(object): - """Parser to run tests""" - # pylint: disable=too-few-public-methods - - def __init__(self): - self.parser = argparse.ArgumentParser() - self.parser.add_argument("-t", "--test", dest="test", action='store', - help="Test case or tier (group of tests) " - "to be executed. It will run all the test " - "if not specified.") - self.parser.add_argument("-n", "--noclean", help="Do not clean " - "OpenStack resources after running each " - "test (default=false).", - action="store_true") - self.parser.add_argument("-r", "--report", help="Push results to " - "database (default=false).", - action="store_true") - - def parse_args(self, argv=None): - """Parse arguments. - - It can call sys.exit if arguments are incorrect. - - Returns: - the arguments from cmdline - """ - return vars(self.parser.parse_args(argv)) - - -class Runner(object): - """Runner class""" - - def __init__(self): - self.executed_test_cases = {} - self.overall_result = Result.EX_OK - self.clean_flag = True - self.report_flag = False - self.tiers = tier_builder.TierBuilder( - env.get('INSTALLER_TYPE'), - env.get('DEPLOY_SCENARIO'), - pkg_resources.resource_filename('functest', 'ci/testcases.yaml')) - - @staticmethod - def source_envfile(rc_file=constants.ENV_FILE): - """Source the env file passed as arg""" - if not os.path.isfile(rc_file): - LOGGER.debug("No env file %s found", rc_file) - return - with open(rc_file, "r") as rcfd: - for line in rcfd: - var = (line.rstrip('"\n').replace('export ', '').split( - "=") if re.search(r'(.*)=(.*)', line) else None) - # The two next lines should be modified as soon as rc_file - # conforms with common rules. Be aware that it could induce - # issues if value starts with ' - if var: - key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0]) - value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:])) - os.environ[key] = value - rcfd.seek(0, 0) - LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read()) - - @staticmethod - def get_dict_by_test(testname): - # pylint: disable=bad-continuation,missing-docstring - with open(pkg_resources.resource_filename( - 'functest', 'ci/testcases.yaml')) as tyaml: - testcases_yaml = yaml.safe_load(tyaml) - for dic_tier in testcases_yaml.get("tiers"): - for dic_testcase in dic_tier['testcases']: - if dic_testcase['case_name'] == testname: - return dic_testcase - LOGGER.error('Project %s is not defined in testcases.yaml', testname) - return None - - @staticmethod - def get_run_dict(testname): - """Obtain the 'run' block of the testcase from testcases.yaml""" - try: - dic_testcase = Runner.get_dict_by_test(testname) - if not dic_testcase: - LOGGER.error("Cannot get %s's config options", testname) - elif 'run' in dic_testcase: - return dic_testcase['run'] - return None - except Exception: # pylint: disable=broad-except - LOGGER.exception("Cannot get %s's config options", testname) - return None - - def run_test(self, test): - """Run one test case""" - if not test.is_enabled(): - raise TestNotEnabled( - "The test case {} is not enabled".format(test.get_name())) - LOGGER.info("Running test case '%s'...", test.get_name()) - result = testcase.TestCase.EX_RUN_ERROR - run_dict = self.get_run_dict(test.get_name()) - if run_dict: - try: - module = importlib.import_module(run_dict['module']) - cls = getattr(module, run_dict['class']) - test_dict = Runner.get_dict_by_test(test.get_name()) - test_case = cls(**test_dict) - self.executed_test_cases[test.get_name()] = test_case - try: - kwargs = run_dict['args'] - test_case.run(**kwargs) - except KeyError: - test_case.run() - if self.report_flag: - test_case.push_to_db() - if test.get_project() == "functest": - result = test_case.is_successful() - else: - result = testcase.TestCase.EX_OK - LOGGER.info("Test result:\n\n%s\n", test_case) - if self.clean_flag: - test_case.clean() - except ImportError: - LOGGER.exception("Cannot import module %s", run_dict['module']) - except AttributeError: - LOGGER.exception("Cannot get class %s", run_dict['class']) - else: - raise Exception("Cannot import the class for the test case.") - return result - - def run_tier(self, tier): - """Run one tier""" - tier_name = tier.get_name() - tests = tier.get_tests() - if not tests: - LOGGER.info("There are no supported test cases in this tier " - "for the given scenario") - self.overall_result = Result.EX_ERROR - else: - LOGGER.info("Running tier '%s'", tier_name) - for test in tests: - self.run_test(test) - test_case = self.executed_test_cases[test.get_name()] - if test_case.is_successful() != testcase.TestCase.EX_OK: - LOGGER.error("The test case '%s' failed.", test.get_name()) - if test.get_project() == "functest": - self.overall_result = Result.EX_ERROR - if test.is_blocking(): - raise BlockingTestFailed( - "The test case {} failed and is blocking".format( - test.get_name())) - return self.overall_result - - def run_all(self): - """Run all available testcases""" - tiers_to_run = [] - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['tiers', 'order', 'CI Loop', 'description', - 'testcases']) - for tier in self.tiers.get_tiers(): - ci_loop = env.get('CI_LOOP') - if (tier.get_tests() and - re.search(ci_loop, tier.get_ci_loop()) is not None): - tiers_to_run.append(tier) - msg.add_row([tier.get_name(), tier.get_order(), - tier.get_ci_loop(), - textwrap.fill(tier.description, width=40), - textwrap.fill(' '.join([str(x.get_name( - )) for x in tier.get_tests()]), width=40)]) - LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg) - for tier in tiers_to_run: - self.run_tier(tier) - - def main(self, **kwargs): - """Entry point of class Runner""" - if 'noclean' in kwargs: - self.clean_flag = not kwargs['noclean'] - if 'report' in kwargs: - self.report_flag = kwargs['report'] - try: - LOGGER.info("Deployment description:\n\n%s\n", env.string()) - self.source_envfile() - if 'test' in kwargs: - LOGGER.debug("Test args: %s", kwargs['test']) - if self.tiers.get_tier(kwargs['test']): - self.run_tier(self.tiers.get_tier(kwargs['test'])) - elif self.tiers.get_test(kwargs['test']): - result = self.run_test( - self.tiers.get_test(kwargs['test'])) - if result != testcase.TestCase.EX_OK: - LOGGER.error("The test case '%s' failed.", - kwargs['test']) - self.overall_result = Result.EX_ERROR - elif kwargs['test'] == "all": - self.run_all() - else: - LOGGER.error("Unknown test case or tier '%s', or not " - "supported by the given scenario '%s'.", - kwargs['test'], - env.get('DEPLOY_SCENARIO')) - LOGGER.debug("Available tiers are:\n\n%s", - self.tiers) - return Result.EX_ERROR - else: - self.run_all() - except BlockingTestFailed: - pass - except Exception: # pylint: disable=broad-except - LOGGER.exception("Failures when running testcase(s)") - self.overall_result = Result.EX_ERROR - if not self.tiers.get_test(kwargs['test']): - self.summary(self.tiers.get_tier(kwargs['test'])) - LOGGER.info("Execution exit value: %s", self.overall_result) - return self.overall_result - - def summary(self, tier=None): - """To generate functest report showing the overall results""" - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'project', 'tier', - 'duration', 'result']) - tiers = [tier] if tier else self.tiers.get_tiers() - for each_tier in tiers: - for test in each_tier.get_tests(): - try: - test_case = self.executed_test_cases[test.get_name()] - except KeyError: - msg.add_row([test.get_name(), test.get_project(), - each_tier.get_name(), "00:00", "SKIP"]) - else: - result = 'PASS' if(test_case.is_successful( - ) == test_case.EX_OK) else 'FAIL' - msg.add_row( - [test_case.case_name, test_case.project_name, - self.tiers.get_tier_name(test_case.case_name), - test_case.get_duration(), result]) - for test in each_tier.get_skipped_test(): - msg.add_row([test.get_name(), test.get_project(), - each_tier.get_name(), "00:00", "SKIP"]) - LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg) - - -def main(): - """Entry point""" - logging.config.fileConfig(pkg_resources.resource_filename( - 'functest', 'ci/logging.ini')) - logging.captureWarnings(True) - parser = RunTestsParser() - args = parser.parse_args(sys.argv[1:]) - runner = Runner() - return runner.main(**args).value diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml deleted file mode 100644 index 953d2aea..00000000 --- a/functest/ci/testcases.yaml +++ /dev/null @@ -1,424 +0,0 @@ ---- -tiers: - - - name: healthcheck - order: 0 - ci_loop: '(daily)|(weekly)' - description: >- - First tier to be executed to verify the basic - operations in the VIM. - testcases: - - - case_name: connection_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies the retrieval of OpenStack clients: - Keystone, Glance, Neutron and Nova and may perform some - simple queries. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: - 'functest.opnfv_tests.openstack.snaps.connection_check' - class: 'ConnectionCheck' - - - - case_name: api_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies the retrieval of OpenStack clients: - Keystone, Glance, Neutron and Nova and may perform some - simple queries. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.api_check' - class: 'ApiCheck' - - - - case_name: snaps_health_check - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case creates executes the SimpleHealthCheck - Python test class which creates an, image, flavor, network, - and Cirros VM instance and observes the console output to - validate the single port obtains the correct IP address. - dependencies: - installer: '' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.health_check' - class: 'HealthCheck' - - - - name: smoke - order: 1 - ci_loop: '(daily)|(weekly)' - description: >- - Set of basic Functional tests to validate the OPNFV scenarios. - testcases: - - - case_name: vping_ssh - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies: 1) SSH to an instance using - floating IPs over the public network. 2) Connectivity - between 2 instances over a private network. - dependencies: - installer: '' - scenario: '^((?!odl_l3|odl-bgpvpn|gluon).)*$' - run: - module: 'functest.opnfv_tests.openstack.vping.vping_ssh' - class: 'VPingSSH' - - - - case_name: vping_userdata - project_name: functest - criteria: 100 - blocking: true - description: >- - This test case verifies: 1) Boot a VM with given userdata. - 2) Connectivity between 2 instances over a private network. - dependencies: - installer: '' - scenario: '^((?!lxd).)*$' - run: - module: - 'functest.opnfv_tests.openstack.vping.vping_userdata' - class: 'VPingUserdata' - - - - case_name: tempest_smoke_serial - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs the smoke subset of the OpenStack - Tempest suite. The list of test cases is generated by - Tempest automatically and depends on the parameters of - the OpenStack deplopyment. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.tempest.tempest' - class: 'TempestSmokeSerial' - - - - case_name: rally_sanity - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs a sub group of tests of the OpenStack - Rally suite in smoke mode. - dependencies: - installer: '' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.rally.rally' - class: 'RallySanity' - - - - case_name: refstack_defcore - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs a sub group of tests of the OpenStack - Defcore testcases by using refstack client. - dependencies: - installer: '' - scenario: '' - run: - module: - 'functest.opnfv_tests.openstack.refstack_client.refstack_client' - class: 'RefstackClient' - - - - case_name: odl - project_name: functest - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller. It - integrates some test suites from upstream using - Robot as the test framework. - dependencies: - installer: '' - scenario: 'odl' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/odl_test/csit/suites/integration/basic - - /src/odl_test/csit/suites/openstack/neutron - - - - case_name: odl_netvirt - project_name: functest - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller when - the NetVirt features are installed. It integrates - some test suites from upstream using Robot as the - test framework. - dependencies: - installer: 'apex' - scenario: 'os-odl_l3-nofeature' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/odl_test/csit/suites/integration/basic - - /src/odl_test/csit/suites/openstack/neutron - - /src/odl_test/csit/suites/openstack/connectivity - - - - case_name: snaps_smoke - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case contains tests that setup and destroy - environments with VMs with and without Floating IPs - with a newly created user and project. Set the config - value snaps.use_floating_ips (True|False) to toggle - this functionality. When the config value of - snaps.use_keystone is True, functest must have access to - the cloud's private network. - - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '^((?!lxd).)*$' - run: - module: 'functest.opnfv_tests.openstack.snaps.smoke' - class: 'SnapsSmoke' - - - - name: features - order: 2 - ci_loop: '(daily)|(weekly)' - description: >- - Test suites from feature projects - integrated in functest - testcases: - - - case_name: doctor-notification - project_name: doctor - criteria: 100 - blocking: false - description: >- - Test suite from Doctor project. - dependencies: - installer: 'apex' - scenario: '^((?!fdio).)*$' - run: - module: 'functest.core.feature' - class: 'BashFeature' - args: - cmd: 'doctor-test' - - - - case_name: bgpvpn - project_name: sdnvpn - criteria: 100 - blocking: false - description: >- - Test suite from SDNVPN project. - dependencies: - installer: '(fuel)|(apex)|(netvirt)' - scenario: 'bgpvpn' - run: - module: 'sdnvpn.test.functest.run_sdnvpn_tests' - class: 'SdnvpnFunctest' - - - - case_name: functest-odl-sfc - project_name: sfc - criteria: 100 - blocking: false - description: >- - Test suite for odl-sfc to test two chains with one SF and - one chain with two SFs - dependencies: - installer: '' - scenario: 'odl.*sfc' - run: - module: 'functest.core.feature' - class: 'BashFeature' - args: - cmd: 'run_sfc_tests.py' - - - - case_name: barometercollectd - project_name: barometer - criteria: 100 - blocking: false - description: >- - Test suite for the Barometer project. Separate tests verify - the proper configuration and basic functionality of all the - collectd plugins as described in the Project Release Plan - dependencies: - installer: 'apex' - scenario: 'bar' - run: - module: 'baro_tests.barometer' - class: 'BarometerCollectd' - - - - case_name: fds - project_name: fastdatastacks - criteria: 100 - blocking: false - description: >- - Test Suite for the OpenDaylight SDN Controller when GBP - features are installed. It integrates some test suites from - upstream using Robot as the test framework. - dependencies: - installer: 'apex' - scenario: 'odl.*-fdio' - run: - module: 'functest.opnfv_tests.sdn.odl.odl' - class: 'ODLTests' - args: - suites: - - /src/fds/testing/robot - - - - name: components - order: 3 - ci_loop: 'weekly' - description: >- - Extensive testing of OpenStack API. - testcases: - - - case_name: tempest_full_parallel - project_name: functest - criteria: 80 - blocking: false - description: >- - The list of test cases is generated by - Tempest automatically and depends on the parameters of - the OpenStack deplopyment. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.tempest.tempest' - class: 'TempestFullParallel' - - - - case_name: rally_full - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case runs the full suite of scenarios of the - OpenStack Rally suite using several threads and iterations. - dependencies: - installer: '^((?!netvirt).)*$' - scenario: '' - run: - module: 'functest.opnfv_tests.openstack.rally.rally' - class: 'RallyFull' - - - - name: vnf - order: 4 - ci_loop: '(daily)|(weekly)' - description: >- - Collection of VNF test cases. - testcases: - - - case_name: cloudify_ims - project_name: functest - criteria: 80 - blocking: false - description: >- - This test case deploys an OpenSource vIMS solution from - Clearwater using the Cloudify orchestrator. It also runs - some signaling traffic. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.ims.cloudify_ims' - class: 'CloudifyIms' - - - - case_name: vyos_vrouter - project_name: functest - criteria: 100 - blocking: false - description: >- - This test case is vRouter testing. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.router.cloudify_vrouter' - class: 'CloudifyVrouter' - - - - case_name: orchestra_openims - project_name: orchestra - enabled: false - criteria: 100 - blocking: false - description: >- - OpenIMS VNF deployment with Open Baton (Orchestra) - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.ims.orchestra_openims' - class: 'OpenImsVnf' - - - - case_name: orchestra_clearwaterims - project_name: orchestra - enabled: false - criteria: 100 - blocking: false - description: >- - ClearwaterIMS VNF deployment with Open Baton (Orchestra) - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: - 'functest.opnfv_tests.vnf.ims.orchestra_clearwaterims' - class: 'ClearwaterImsVnf' - - - - case_name: juju_epc - project_name: functest - criteria: 100 - blocking: false - description: >- - vEPC validation with Juju as VNF manager and ABoT as test - executor. - dependencies: - installer: '' - scenario: 'os-nosdn-nofeature-.*ha' - run: - module: 'functest.opnfv_tests.vnf.epc.juju_epc' - class: 'JujuEpc' diff --git a/functest/ci/tier_builder.py b/functest/ci/tier_builder.py deleted file mode 100644 index 370ab94d..00000000 --- a/functest/ci/tier_builder.py +++ /dev/null @@ -1,106 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""TierBuilder class to parse testcases config file""" - -import yaml - -import functest.ci.tier_handler as th - - -class TierBuilder(object): - # pylint: disable=missing-docstring - - def __init__(self, ci_installer, ci_scenario, testcases_file): - self.ci_installer = ci_installer - self.ci_scenario = ci_scenario - self.testcases_file = testcases_file - self.dic_tier_array = None - self.tier_objects = [] - self.testcases_yaml = None - self.generate_tiers() - - def read_test_yaml(self): - with open(self.testcases_file) as tc_file: - self.testcases_yaml = yaml.safe_load(tc_file) - - self.dic_tier_array = [] - for tier in self.testcases_yaml.get("tiers"): - self.dic_tier_array.append(tier) - - def generate_tiers(self): - if self.dic_tier_array is None: - self.read_test_yaml() - - del self.tier_objects[:] - for dic_tier in self.dic_tier_array: - tier = th.Tier( - name=dic_tier['name'], order=dic_tier['order'], - ci_loop=dic_tier['ci_loop'], - description=dic_tier['description']) - - for dic_testcase in dic_tier['testcases']: - installer = dic_testcase['dependencies']['installer'] - scenario = dic_testcase['dependencies']['scenario'] - dep = th.Dependency(installer, scenario) - - testcase = th.TestCase( - name=dic_testcase['case_name'], - enabled=dic_testcase.get('enabled', True), - dependency=dep, criteria=dic_testcase['criteria'], - blocking=dic_testcase['blocking'], - description=dic_testcase['description'], - project=dic_testcase['project_name']) - if (testcase.is_compatible(self.ci_installer, - self.ci_scenario) and - testcase.is_enabled()): - tier.add_test(testcase) - else: - tier.skip_test(testcase) - - self.tier_objects.append(tier) - - def get_tiers(self): - return self.tier_objects - - def get_tier_names(self): - tier_names = [] - for tier in self.tier_objects: - tier_names.append(tier.get_name()) - return tier_names - - def get_tier(self, tier_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].get_name() == tier_name: - return self.tier_objects[i] - return None - - def get_tier_name(self, test_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].is_test(test_name): - return self.tier_objects[i].name - return None - - def get_test(self, test_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].is_test(test_name): - return self.tier_objects[i].get_test(test_name) - return None - - def get_tests(self, tier_name): - for i in range(0, len(self.tier_objects)): - if self.tier_objects[i].get_name() == tier_name: - return self.tier_objects[i].get_tests() - return None - - def __str__(self): - output = "" - for i in range(0, len(self.tier_objects)): - output += str(self.tier_objects[i]) + "\n" - return output diff --git a/functest/ci/tier_handler.py b/functest/ci/tier_handler.py deleted file mode 100644 index 9fc3f24d..00000000 --- a/functest/ci/tier_handler.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Ericsson AB and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Tier and TestCase classes to wrap the testcases config file""" -# pylint: disable=missing-docstring - -import re -import textwrap - -import prettytable - - -LINE_LENGTH = 72 - - -def split_text(text, max_len): - words = text.split() - lines = [] - line = "" - for word in words: - if len(line) + len(word) < max_len - 1: - line += word + " " - else: - lines.append(line) - line = word + " " - if line != "": - lines.append(line) - return lines - - -class Tier(object): - - def __init__(self, name, order, ci_loop, description=""): - self.tests_array = [] - self.skipped_tests_array = [] - self.name = name - self.order = order - self.ci_loop = ci_loop - self.description = description - - def add_test(self, testcase): - self.tests_array.append(testcase) - - def skip_test(self, testcase): - self.skipped_tests_array.append(testcase) - - def get_tests(self): - array_tests = [] - for test in self.tests_array: - array_tests.append(test) - return array_tests - - def get_skipped_test(self): - return self.skipped_tests_array - - def get_test_names(self): - array_tests = [] - for test in self.tests_array: - array_tests.append(test.get_name()) - return array_tests - - def get_test(self, test_name): - if self.is_test(test_name): - for test in self.tests_array: - if test.get_name() == test_name: - return test - return None - - def is_test(self, test_name): - for test in self.tests_array: - if test.get_name() == test_name: - return True - return False - - def get_name(self): - return self.name - - def get_order(self): - return self.order - - def get_ci_loop(self): - return self.ci_loop - - def __str__(self): - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['tiers', 'order', 'CI Loop', 'description', - 'testcases']) - msg.add_row( - [self.name, self.order, self.ci_loop, - textwrap.fill(self.description, width=40), - textwrap.fill(' '.join([str(x.get_name( - )) for x in self.get_tests()]), width=40)]) - return msg.get_string() - - -class TestCase(object): - - def __init__(self, name, enabled, dependency, criteria, blocking, - description="", project=""): - # pylint: disable=too-many-arguments - self.name = name - self.enabled = enabled - self.dependency = dependency - self.criteria = criteria - self.blocking = blocking - self.description = description - self.project = project - - @staticmethod - def is_none(item): - return item is None or item == "" - - def is_compatible(self, ci_installer, ci_scenario): - try: - if not self.is_none(ci_installer): - if re.search(self.dependency.get_installer(), - ci_installer) is None: - return False - if not self.is_none(ci_scenario): - if re.search(self.dependency.get_scenario(), - ci_scenario) is None: - return False - return True - except TypeError: - return False - - def get_name(self): - return self.name - - def is_enabled(self): - return self.enabled - - def get_criteria(self): - return self.criteria - - def is_blocking(self): - return self.blocking - - def get_project(self): - return self.project - - def __str__(self): - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'description', 'criteria', 'dependency']) - msg.add_row([self.name, textwrap.fill(self.description, width=40), - self.criteria, self.dependency]) - return msg.get_string() - - -class Dependency(object): - - def __init__(self, installer, scenario): - self.installer = installer - self.scenario = scenario - - def get_installer(self): - return self.installer - - def get_scenario(self): - return self.scenario - - def __str__(self): - delimitator = "\n" if self.get_installer( - ) and self.get_scenario() else "" - return "{}{}{}".format(self.get_installer(), delimitator, - self.get_scenario()) diff --git a/functest/core/__init__.py b/functest/core/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/core/feature.py b/functest/core/feature.py deleted file mode 100644 index 65fd5a08..00000000 --- a/functest/core/feature.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 ZTE Corp and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent classes of all Functest Features. - -Feature is considered as TestCase offered by Third-party. It offers -helpers to run any python method or any bash command. -""" - -import logging -import subprocess -import time - -import functest.core.testcase as base - -__author__ = ("Serena Feng , " - "Cedric Ollivier ") - - -class Feature(base.TestCase): - """Base model for single feature.""" - - __logger = logging.getLogger(__name__) - dir_results = "/home/opnfv/functest/results" - - def __init__(self, **kwargs): - super(Feature, self).__init__(**kwargs) - self.result_file = "{}/{}.log".format(self.dir_results, self.case_name) - try: - module = kwargs['run']['module'] - self.logger = logging.getLogger(module) - except KeyError: - self.__logger.warning( - "Cannot get module name %s. Using %s as fallback", - kwargs, self.case_name) - self.logger = logging.getLogger(self.case_name) - handler = logging.StreamHandler() - handler.setLevel(logging.WARN) - self.logger.addHandler(handler) - handler = logging.FileHandler(self.result_file) - handler.setLevel(logging.DEBUG) - self.logger.addHandler(handler) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - def execute(self, **kwargs): - """Execute the Python method. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must return 0 if success or anything - else if failure. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - -1. - """ - # pylint: disable=unused-argument,no-self-use - return -1 - - def run(self, **kwargs): - """Run the feature. - - It allows executing any Python method by calling execute(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time. - - It doesn't fulfill details when pushing the results to the DB. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if execute() returns 0, - TestCase.EX_RUN_ERROR otherwise. - """ - self.start_time = time.time() - exit_code = base.TestCase.EX_RUN_ERROR - self.result = 0 - try: - if self.execute(**kwargs) == 0: - exit_code = base.TestCase.EX_OK - self.result = 100 - except Exception: # pylint: disable=broad-except - self.__logger.exception("%s FAILED", self.project_name) - self.__logger.info("Test result is stored in '%s'", self.result_file) - self.stop_time = time.time() - return exit_code - - -class BashFeature(Feature): - """Class designed to run any bash command.""" - - __logger = logging.getLogger(__name__) - - def execute(self, **kwargs): - """Execute the cmd passed as arg - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - 0 if cmd returns 0, - -1 otherwise. - """ - ret = -1 - try: - cmd = kwargs["cmd"] - with open(self.result_file, 'w+') as f_stdout: - proc = subprocess.Popen(cmd.split(), stdout=f_stdout, - stderr=subprocess.STDOUT) - ret = proc.wait() - if ret != 0: - self.__logger.error("Execute command: %s failed", cmd) - except KeyError: - self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs) - return ret diff --git a/functest/core/robotframework.py b/functest/core/robotframework.py deleted file mode 100644 index 54574a68..00000000 --- a/functest/core/robotframework.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define classes required to run any Robot suites.""" - -from __future__ import division - -import errno -import logging -import os - -import robot.api -from robot.errors import RobotError -import robot.run -from robot.utils.robottime import timestamp_to_secs -from six import StringIO - -from functest.core import testcase - -__author__ = "Cedric Ollivier " - - -class ResultVisitor(robot.api.ResultVisitor): - """Visitor to get result details.""" - - def __init__(self): - self._data = [] - - def visit_test(self, test): - output = {} - output['name'] = test.name - output['parent'] = test.parent.name - output['status'] = test.status - output['starttime'] = test.starttime - output['endtime'] = test.endtime - output['critical'] = test.critical - output['text'] = test.message - output['elapsedtime'] = test.elapsedtime - self._data.append(output) - - def get_data(self): - """Get the details of the result.""" - return self._data - - -class RobotFramework(testcase.TestCase): - """RobotFramework runner.""" - - __logger = logging.getLogger(__name__) - dir_results = "/home/opnfv/functest/results" - - def __init__(self, **kwargs): - self.res_dir = os.path.join(self.dir_results, 'robot') - self.xml_file = os.path.join(self.res_dir, 'output.xml') - super(RobotFramework, self).__init__(**kwargs) - - def parse_results(self): - """Parse output.xml and get the details in it.""" - result = robot.api.ExecutionResult(self.xml_file) - visitor = ResultVisitor() - result.visit(visitor) - try: - self.result = 100 * ( - result.suite.statistics.critical.passed / - result.suite.statistics.critical.total) - except ZeroDivisionError: - self.__logger.error("No test has been run") - self.start_time = timestamp_to_secs(result.suite.starttime) - self.stop_time = timestamp_to_secs(result.suite.endtime) - self.details = {} - self.details['description'] = result.suite.name - self.details['tests'] = visitor.get_data() - - def run(self, **kwargs): - """Run the RobotFramework suites - - Here are the steps: - * create the output directories if required, - * get the results in output.xml, - * delete temporary files. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - EX_OK if all suites ran well. - EX_RUN_ERROR otherwise. - """ - try: - suites = kwargs["suites"] - variable = kwargs.get("variable", []) - variablefile = kwargs.get("variablefile", []) - except KeyError: - self.__logger.exception("Mandatory args were not passed") - return self.EX_RUN_ERROR - try: - os.makedirs(self.res_dir) - except OSError as ex: - if ex.errno != errno.EEXIST: - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot create %s", self.res_dir) - return self.EX_RUN_ERROR - stream = StringIO() - robot.run(*suites, variable=variable, variablefile=variablefile, - output=self.xml_file, log='NONE', - report='NONE', stdout=stream) - self.__logger.info("\n" + stream.getvalue()) - self.__logger.info("Results were successfully generated") - try: - self.parse_results() - self.__logger.info("Results were successfully parsed") - except RobotError as ex: - self.__logger.error("Run suites before publishing: %s", ex.message) - return self.EX_RUN_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("Cannot parse results") - return self.EX_RUN_ERROR - return self.EX_OK diff --git a/functest/core/testcase.py b/functest/core/testcase.py deleted file mode 100644 index e8bb1409..00000000 --- a/functest/core/testcase.py +++ /dev/null @@ -1,227 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all Functest TestCases.""" - -from datetime import datetime -import json -import logging -import os -import re -import requests - -from functest.utils import decorators -from functest.utils import env - - -import prettytable - - -__author__ = "Cedric Ollivier " - - -class TestCase(object): - """Base model for single test case.""" - - EX_OK = os.EX_OK - """everything is OK""" - - EX_RUN_ERROR = os.EX_SOFTWARE - """run() failed""" - - EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1 - """push_to_db() failed""" - - EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2 - """results are false""" - - _job_name_rule = "(dai|week)ly-(.+?)-[0-9]*" - _headers = {'Content-Type': 'application/json'} - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - self.details = {} - self.project_name = kwargs.get('project_name', 'functest') - self.case_name = kwargs.get('case_name', '') - self.criteria = kwargs.get('criteria', 100) - self.result = 0 - self.start_time = 0 - self.stop_time = 0 - - def __str__(self): - try: - assert self.project_name - assert self.case_name - result = 'PASS' if(self.is_successful( - ) == TestCase.EX_OK) else 'FAIL' - msg = prettytable.PrettyTable( - header_style='upper', padding_width=5, - field_names=['test case', 'project', 'duration', - 'result']) - msg.add_row([self.case_name, self.project_name, - self.get_duration(), result]) - return msg.get_string() - except AssertionError: - self.__logger.error("We cannot print invalid objects") - return super(TestCase, self).__str__() - - def get_duration(self): - """Return the duration of the test case. - - Returns: - duration if start_time and stop_time are set - "XX:XX" otherwise. - """ - try: - assert self.start_time - assert self.stop_time - if self.stop_time < self.start_time: - return "XX:XX" - return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod( - self.stop_time - self.start_time, 60)) - except Exception: # pylint: disable=broad-except - self.__logger.error("Please run test before getting the duration") - return "XX:XX" - - def is_successful(self): - """Interpret the result of the test case. - - It allows getting the result of TestCase. It completes run() - which only returns the execution status. - - It can be overriden if checking result is not suitable. - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - try: - assert self.criteria - assert self.result is not None - if (not isinstance(self.result, str) and - not isinstance(self.criteria, str)): - if self.result >= self.criteria: - return TestCase.EX_OK - else: - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - # In this case criteria is unread. - self.__logger.warning( - "Please update result which must be an int!") - if self.result == 'PASS': - return TestCase.EX_OK - except AssertionError: - self.__logger.error("Please run test before checking the results") - return TestCase.EX_TESTCASE_FAILED - - def run(self, **kwargs): - """Run the test case. - - It allows running TestCase and getting its execution - status. - - The subclasses must override the default implementation which - is false on purpose. - - The new implementation must set the following attributes to - push the results to DB: - - * result, - * start_time, - * stop_time. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_RUN_ERROR. - """ - # pylint: disable=unused-argument - self.__logger.error("Run must be implemented") - return TestCase.EX_RUN_ERROR - - @decorators.can_dump_request_to_file - def push_to_db(self): - """Push the results of the test case to the DB. - - It allows publishing the results and checking the status. - - It could be overriden if the common implementation is not - suitable. - - The following attributes must be set before pushing the results to DB: - - * project_name, - * case_name, - * result, - * start_time, - * stop_time. - - The next vars must be set in env: - - * TEST_DB_URL, - * INSTALLER_TYPE, - * DEPLOY_SCENARIO, - * NODE_NAME, - * BUILD_TAG. - - Returns: - TestCase.EX_OK if results were pushed to DB. - TestCase.EX_PUSH_TO_DB_ERROR otherwise. - """ - try: - assert self.project_name - assert self.case_name - assert self.start_time - assert self.stop_time - url = env.get('TEST_DB_URL') - data = {"project_name": self.project_name, - "case_name": self.case_name, - "details": self.details} - data["installer"] = env.get('INSTALLER_TYPE') - data["scenario"] = env.get('DEPLOY_SCENARIO') - data["pod_name"] = env.get('NODE_NAME') - data["build_tag"] = env.get('BUILD_TAG') - data["criteria"] = 'PASS' if self.is_successful( - ) == TestCase.EX_OK else 'FAIL' - data["start_date"] = datetime.fromtimestamp( - self.start_time).strftime('%Y-%m-%d %H:%M:%S') - data["stop_date"] = datetime.fromtimestamp( - self.stop_time).strftime('%Y-%m-%d %H:%M:%S') - try: - data["version"] = re.search( - TestCase._job_name_rule, - env.get('BUILD_TAG')).group(2) - except Exception: # pylint: disable=broad-except - data["version"] = "unknown" - req = requests.post( - url, data=json.dumps(data, sort_keys=True), - headers=self._headers) - req.raise_for_status() - self.__logger.info( - "The results were successfully pushed to DB %s", url) - except AssertionError: - self.__logger.exception( - "Please run test before publishing the results") - return TestCase.EX_PUSH_TO_DB_ERROR - except requests.exceptions.HTTPError: - self.__logger.exception("The HTTP request raises issues") - return TestCase.EX_PUSH_TO_DB_ERROR - except Exception: # pylint: disable=broad-except - self.__logger.exception("The results cannot be pushed to DB") - return TestCase.EX_PUSH_TO_DB_ERROR - return TestCase.EX_OK - - def clean(self): - """Clean the resources. - - It can be overriden if resources must be deleted after - running the test case. - """ diff --git a/functest/core/unit.py b/functest/core/unit.py deleted file mode 100644 index 61b5a58d..00000000 --- a/functest/core/unit.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Cable Television Laboratories, Inc. and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class to run unittest.TestSuite as TestCase.""" - -from __future__ import division - -import logging -import time -import unittest - -import six - -from functest.core import testcase - -__author__ = ("Steven Pisarski , " - "Cedric Ollivier ") - - -class Suite(testcase.TestCase): - """Base model for running unittest.TestSuite.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(Suite, self).__init__(**kwargs) - self.suite = None - - def run(self, **kwargs): - """Run the test suite. - - It allows running any unittest.TestSuite and getting its - execution status. - - By default, it runs the suite defined as instance attribute. - It can be overriden by passing name as arg. It must - conform with TestLoader.loadTestsFromName(). - - It sets the following attributes required to push the results - to DB: - - * result, - * start_time, - * stop_time, - * details. - - Args: - kwargs: Arbitrary keyword arguments. - - Returns: - TestCase.EX_OK if any TestSuite has been run, - TestCase.EX_RUN_ERROR otherwise. - """ - try: - name = kwargs["name"] - try: - self.suite = unittest.TestLoader().loadTestsFromName(name) - except ImportError: - self.__logger.error("Can not import %s", name) - return testcase.TestCase.EX_RUN_ERROR - except KeyError: - pass - try: - assert self.suite - self.start_time = time.time() - stream = six.StringIO() - result = unittest.TextTestRunner( - stream=stream, verbosity=2).run(self.suite) - self.__logger.debug("\n\n%s", stream.getvalue()) - self.stop_time = time.time() - self.details = { - "testsRun": result.testsRun, - "failures": len(result.failures), - "errors": len(result.errors), - "stream": stream.getvalue()} - self.result = 100 * ( - (result.testsRun - (len(result.failures) + - len(result.errors))) / - result.testsRun) - return testcase.TestCase.EX_OK - except AssertionError: - self.__logger.error("No suite is defined") - return testcase.TestCase.EX_RUN_ERROR - except ZeroDivisionError: - self.__logger.error("No test has been run") - return testcase.TestCase.EX_RUN_ERROR diff --git a/functest/core/vnf.py b/functest/core/vnf.py deleted file mode 100644 index cf20492b..00000000 --- a/functest/core/vnf.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the parent class of all VNF TestCases.""" - -import logging -import time -import uuid - -from snaps.config.user import UserConfig -from snaps.config.project import ProjectConfig -from snaps.openstack.create_user import OpenStackUser -from snaps.openstack.create_project import OpenStackProject -from snaps.openstack.tests import openstack_tests - -from functest.core import testcase -from functest.utils import constants - -__author__ = ("Morgan Richomme , " - "Valentin Boucher ") - - -class VnfPreparationException(Exception): - """Raise when VNF preparation cannot be executed.""" - - -class OrchestratorDeploymentException(Exception): - """Raise when orchestrator cannot be deployed.""" - - -class VnfDeploymentException(Exception): - """Raise when VNF cannot be deployed.""" - - -class VnfTestException(Exception): - """Raise when VNF cannot be tested.""" - - -class VnfOnBoarding(testcase.TestCase): - # pylint: disable=too-many-instance-attributes - """Base model for VNF test cases.""" - - __logger = logging.getLogger(__name__) - - def __init__(self, **kwargs): - super(VnfOnBoarding, self).__init__(**kwargs) - self.uuid = uuid.uuid4() - self.user_name = "{}-{}".format(self.case_name, self.uuid) - self.tenant_name = "{}-{}".format(self.case_name, self.uuid) - self.snaps_creds = {} - self.created_object = [] - self.os_project = None - self.tenant_description = "Created by OPNFV Functest: {}".format( - self.case_name) - - def run(self, **kwargs): - """ - Run of the VNF test case: - - * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...), - * Deploy the VNF, - * Perform tests on the VNF - - A VNF test case is successfull when the 3 steps are PASS - If one of the step is FAIL, the test case is FAIL - - Returns: - TestCase.EX_OK if result is 'PASS'. - TestCase.EX_TESTCASE_FAILED otherwise. - """ - self.start_time = time.time() - - try: - self.prepare() - if (self.deploy_orchestrator() and - self.deploy_vnf() and - self.test_vnf()): - self.stop_time = time.time() - # Calculation with different weight depending on the steps TODO - self.result = 100 - return testcase.TestCase.EX_OK - self.result = 0 - self.stop_time = time.time() - return testcase.TestCase.EX_TESTCASE_FAILED - except Exception: # pylint: disable=broad-except - self.stop_time = time.time() - self.__logger.exception("Exception on VNF testing") - return testcase.TestCase.EX_TESTCASE_FAILED - - def prepare(self): - """ - Prepare the environment for VNF testing: - - * Creation of a user, - * Creation of a tenant, - * Allocation admin role to the user on this tenant - - Returns base.TestCase.EX_OK if preparation is successfull - - Raise VnfPreparationException in case of problem - """ - try: - self.__logger.info( - "Prepare VNF: %s, description: %s", self.case_name, - self.tenant_description) - snaps_creds = openstack_tests.get_credentials( - os_env_file=constants.ENV_FILE) - - self.os_project = OpenStackProject( - snaps_creds, - ProjectConfig( - name=self.tenant_name, - description=self.tenant_description - )) - self.os_project.create() - self.created_object.append(self.os_project) - user_creator = OpenStackUser( - snaps_creds, - UserConfig( - name=self.user_name, - password=str(uuid.uuid4()), - roles={'admin': self.tenant_name})) - user_creator.create() - self.created_object.append(user_creator) - self.snaps_creds = user_creator.get_os_creds(self.tenant_name) - - return testcase.TestCase.EX_OK - except Exception: # pylint: disable=broad-except - self.__logger.exception("Exception raised during VNF preparation") - raise VnfPreparationException - - def deploy_orchestrator(self): - """ - Deploy an orchestrator (optional). - - If this method is overriden then raise orchestratorDeploymentException - if error during orchestrator deployment - """ - self.__logger.info("Deploy orchestrator (if necessary)") - return True - - def deploy_vnf(self): - """ - Deploy the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - The deployment can be executed via a specific orchestrator - or using build-in orchestrators such as heat, OpenBaton, cloudify, - juju, onap, ... - - Returns: - True if the VNF is properly deployed - False if the VNF is not deployed - - Raise VnfDeploymentException if error during VNF deployment - """ - self.__logger.error("VNF must be deployed") - raise VnfDeploymentException - - def test_vnf(self): - """ - Test the VNF - - This function MUST be implemented by vnf test cases. - The details section MAY be updated in the vnf test cases. - - Once a VNF is deployed, it is assumed that specific test suite can be - run to validate the VNF. - Please note that the same test suite can be used on several test case - (e.g. clearwater test suite can be used whatever the orchestrator used - for the deployment) - - Returns: - True if VNF tests are PASS - False if test suite is FAIL - - Raise VnfTestException if error during VNF test - """ - self.__logger.error("VNF must be tested") - raise VnfTestException - - def clean(self): - """ - Clean VNF test case. - - It is up to the test providers to delete resources used for the tests. - By default we clean: - - * the user, - * the tenant - """ - self.__logger.info('Removing the VNF resources ..') - for creator in reversed(self.created_object): - try: - creator.clean() - except Exception as exc: # pylint: disable=broad-except - self.__logger.error('Unexpected error cleaning - %s', exc) diff --git a/functest/energy/__init__.py b/functest/energy/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/energy/energy.py b/functest/energy/energy.py deleted file mode 100644 index a2652211..00000000 --- a/functest/energy/energy.py +++ /dev/null @@ -1,336 +0,0 @@ -#!/usr/bin/env python -# -*- coding: UTF-8 -*- - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""This module manages calls to Energy recording API.""" - -import json -import logging -import traceback - -from functools import wraps -import requests -from six.moves import urllib - -from functest.utils import env - - -def finish_session(current_scenario): - """Finish a recording session.""" - if current_scenario is None: - EnergyRecorder.stop() - else: - EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)", - current_scenario["scenario"], - current_scenario["step"]) - EnergyRecorder.submit_scenario( - current_scenario["scenario"], - current_scenario["step"] - ) - - -def enable_recording(method): - """ - Record energy during method execution. - - Decorator to record energy during "method" exection. - - param method: Method to suround with start and stop - :type method: function - - .. note:: "method" should belong to a class having a "case_name" - attribute - """ - @wraps(method) - def wrapper(*args): - """ - Record energy during method execution (implementation). - - Wrapper for decorator to handle method arguments. - """ - current_scenario = EnergyRecorder.get_current_scenario() - EnergyRecorder.start(args[0].case_name) - try: - return_value = method(*args) - finish_session(current_scenario) - except Exception as exc: # pylint: disable=broad-except - EnergyRecorder.logger.exception(exc) - finish_session(current_scenario) - raise exc - return return_value - return wrapper - - -# Class to manage energy recording sessions -class EnergyRecorder(object): - """Manage Energy recording session.""" - - logger = logging.getLogger(__name__) - # Energy recording API connectivity settings - # see load_config method - energy_recorder_api = None - - # Default initial step - INITIAL_STEP = "running" - - # Default connection timeout - CONNECTION_TIMEOUT = 4 - - @staticmethod - def load_config(): - """ - Load connectivity settings from yaml. - - Load connectivity settings to Energy recording API - Use functest global config yaml file - (see functest_utils.get_functest_config) - """ - # Singleton pattern for energy_recorder_api static member - # Load only if not previouly done - if EnergyRecorder.energy_recorder_api is None: - assert env.get('NODE_NAME') - assert env.get('ENERGY_RECORDER_API_URL') - environment = env.get('NODE_NAME') - energy_recorder_uri = env.get( - 'ENERGY_RECORDER_API_URL') - - # Creds - creds_usr = env.get("ENERGY_RECORDER_API_USER") - creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD") - - uri_comp = "/recorders/environment/" - uri_comp += urllib.parse.quote_plus(environment) - - if creds_usr and creds_pass: - energy_recorder_api_auth = (creds_usr, creds_pass) - else: - energy_recorder_api_auth = None - - try: - resp = requests.get(energy_recorder_uri + "/monitoring/ping", - auth=energy_recorder_api_auth, - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT) - api_available = json.loads(resp.text)["status"] == "OK" - EnergyRecorder.logger.info( - "API recorder available at : %s", - energy_recorder_uri + uri_comp) - except Exception as exc: # pylint: disable=broad-except - EnergyRecorder.logger.info( - "Energy recorder API is not available, cause=%s", - str(exc)) - api_available = False - # Final config - EnergyRecorder.energy_recorder_api = { - "uri": energy_recorder_uri + uri_comp, - "auth": energy_recorder_api_auth, - "available": api_available - } - return EnergyRecorder.energy_recorder_api["available"] - - @staticmethod - def submit_scenario(scenario, step): - """ - Submit a complet scenario definition to Energy recorder API. - - param scenario: Scenario name - :type scenario: string - param step: Step name - :type step: string - """ - try: - return_status = True - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Submitting scenario (%s/%s)", - scenario, step) - - # Create API payload - payload = { - "step": step, - "scenario": scenario - } - # Call API to start energy recording - response = requests.post( - EnergyRecorder.energy_recorder_api["uri"], - data=json.dumps(payload), - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while submitting scenario\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "submit_scenario: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while submitting scenarion to energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def start(scenario): - """ - Start a recording session for scenario. - - param scenario: Starting scenario - :type scenario: string - """ - return_status = True - try: - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Starting recording") - return_status = EnergyRecorder.submit_scenario( - scenario, - EnergyRecorder.INITIAL_STEP - ) - - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while starting energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def stop(): - """Stop current recording session.""" - return_status = True - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Stopping recording") - - # Call API to stop energy recording - response = requests.delete( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while stating energy recording session\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "stop: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while stoping energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def set_step(step): - """Notify energy recording service of current step of the testcase.""" - return_status = True - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Setting step") - - # Create API payload - payload = { - "step": step, - } - - # Call API to define step - response = requests.post( - EnergyRecorder.energy_recorder_api["uri"] + "/step", - data=json.dumps(payload), - auth=EnergyRecorder.energy_recorder_api["auth"], - headers={ - 'content-type': 'application/json' - }, - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code != 200: - EnergyRecorder.logger.error( - "Error while setting current step of testcase\n%s", - response.text) - return_status = False - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "set_step: Unable to connect energy recorder API") - return_status = False - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while setting step on energy recorder API\n%s", - traceback.format_exc() - ) - return_status = False - return return_status - - @staticmethod - def get_current_scenario(): - """Get current running scenario (if any, None else).""" - return_value = None - try: - # Ensure that connectyvity settings are loaded - if EnergyRecorder.load_config(): - EnergyRecorder.logger.debug("Getting current scenario") - - # Call API get running scenario - response = requests.get( - EnergyRecorder.energy_recorder_api["uri"], - auth=EnergyRecorder.energy_recorder_api["auth"], - timeout=EnergyRecorder.CONNECTION_TIMEOUT - ) - if response.status_code == 200: - return_value = json.loads(response.text) - elif response.status_code == 404: - EnergyRecorder.logger.info( - "No current running scenario at %s", - EnergyRecorder.energy_recorder_api["uri"]) - return_value = None - else: - EnergyRecorder.logger.error( - "Error while getting current scenario\n%s", - response.text) - return_value = None - except requests.exceptions.ConnectionError: - EnergyRecorder.logger.warning( - "get_currernt_sceario: Unable to connect energy recorder API") - return_value = None - except Exception: # pylint: disable=broad-except - # Default exception handler to ensure that method - # is safe for caller - EnergyRecorder.logger.info( - "Error while getting current scenario from energy recorder API" - "\n%s", traceback.format_exc() - ) - return_value = None - return return_value diff --git a/functest/tests/__init__.py b/functest/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/tests/unit/__init__.py b/functest/tests/unit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/tests/unit/ci/__init__.py b/functest/tests/unit/ci/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py deleted file mode 100644 index b8dca20c..00000000 --- a/functest/tests/unit/ci/test_run_tests.py +++ /dev/null @@ -1,267 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest -import os - -import mock - -from functest.ci import run_tests -from functest.core.testcase import TestCase - - -class FakeModule(TestCase): - - def run(self, **kwargs): - return TestCase.EX_OK - - -class RunTestsTesting(unittest.TestCase): - - def setUp(self): - self.runner = run_tests.Runner() - mock_test_case = mock.Mock() - mock_test_case.is_successful.return_value = TestCase.EX_OK - self.runner.executed_test_cases['test1'] = mock_test_case - self.runner.executed_test_cases['test2'] = mock_test_case - self.sep = 'test_sep' - self.creds = {'OS_AUTH_URL': 'http://test_ip:test_port/v2.0', - 'OS_USERNAME': 'test_os_username', - 'OS_TENANT_NAME': 'test_tenant', - 'OS_PASSWORD': 'test_password'} - self.test = {'test_name': 'test_name'} - self.tier = mock.Mock() - test1 = mock.Mock() - test1.get_name.return_value = 'test1' - test2 = mock.Mock() - test2.get_name.return_value = 'test2' - attrs = {'get_name.return_value': 'test_tier', - 'get_tests.return_value': [test1, test2], - 'get_ci_loop.return_value': 'test_ci_loop', - 'get_test_names.return_value': ['test1', 'test2']} - self.tier.configure_mock(**attrs) - - self.tiers = mock.Mock() - attrs = {'get_tiers.return_value': [self.tier]} - self.tiers.configure_mock(**attrs) - - self.run_tests_parser = run_tests.RunTestsParser() - - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test') - def test_get_run_dict(self, *args): - retval = {'run': mock.Mock()} - args[0].return_value = retval - self.assertEqual(self.runner.get_run_dict('test_name'), retval['run']) - args[0].assert_called_once_with('test_name') - - @mock.patch('functest.ci.run_tests.LOGGER.error') - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test', - return_value=None) - def test_get_run_dict_config_ko(self, *args): - testname = 'test_name' - self.assertEqual(self.runner.get_run_dict(testname), None) - args[0].return_value = {} - self.assertEqual(self.runner.get_run_dict(testname), None) - calls = [mock.call(testname), mock.call(testname)] - args[0].assert_has_calls(calls) - calls = [mock.call("Cannot get %s's config options", testname), - mock.call("Cannot get %s's config options", testname)] - args[1].assert_has_calls(calls) - - @mock.patch('functest.ci.run_tests.LOGGER.exception') - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test', - side_effect=Exception) - def test_get_run_dict_exception(self, *args): - testname = 'test_name' - self.assertEqual(self.runner.get_run_dict(testname), None) - args[1].assert_called_once_with( - "Cannot get %s's config options", testname) - - def _test_source_envfile(self, msg, key='OS_TENANT_NAME', value='admin'): - try: - del os.environ[key] - except Exception: # pylint: disable=broad-except - pass - envfile = 'rc_file' - with mock.patch('six.moves.builtins.open', - mock.mock_open(read_data=msg)) as mock_method,\ - mock.patch('os.path.isfile', return_value=True): - mock_method.return_value.__iter__ = lambda self: iter( - self.readline, '') - self.runner.source_envfile(envfile) - mock_method.assert_called_once_with(envfile, 'r') - self.assertEqual(os.environ[key], value) - - def test_source_envfile(self): - self._test_source_envfile('OS_TENANT_NAME=admin') - self._test_source_envfile('OS_TENANT_NAME= admin') - self._test_source_envfile('OS_TENANT_NAME = admin') - self._test_source_envfile('OS_TENANT_NAME = "admin"') - self._test_source_envfile('export OS_TENANT_NAME=admin') - self._test_source_envfile('export OS_TENANT_NAME =admin') - self._test_source_envfile('export OS_TENANT_NAME = admin') - self._test_source_envfile('export OS_TENANT_NAME = "admin"') - # This test will fail as soon as rc_file is fixed - self._test_source_envfile( - 'export "\'OS_TENANT_NAME\'" = "\'admin\'"') - - def test_get_dict_by_test(self): - with mock.patch('six.moves.builtins.open', mock.mock_open()), \ - mock.patch('yaml.safe_load') as mock_yaml: - mock_obj = mock.Mock() - testcase_dict = {'case_name': 'testname', - 'criteria': 50} - attrs = {'get.return_value': [{'testcases': [testcase_dict]}]} - mock_obj.configure_mock(**attrs) - mock_yaml.return_value = mock_obj - self.assertDictEqual( - run_tests.Runner.get_dict_by_test('testname'), - testcase_dict) - - @mock.patch('functest.ci.run_tests.Runner.get_run_dict', - return_value=None) - def test_run_tests_import_exception(self, *args): - mock_test = mock.Mock() - kwargs = {'get_name.return_value': 'test_name', - 'needs_clean.return_value': False} - mock_test.configure_mock(**kwargs) - with self.assertRaises(Exception) as context: - self.runner.run_test(mock_test) - args[0].assert_called_with('test_name') - msg = "Cannot import the class for the test case." - self.assertTrue(msg in str(context.exception)) - - @mock.patch('importlib.import_module', name="module", - return_value=mock.Mock(test_class=mock.Mock( - side_effect=FakeModule))) - @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test') - def test_run_tests_default(self, *args): - mock_test = mock.Mock() - kwargs = {'get_name.return_value': 'test_name', - 'needs_clean.return_value': True} - mock_test.configure_mock(**kwargs) - test_run_dict = {'module': 'test_module', - 'class': 'test_class'} - with mock.patch('functest.ci.run_tests.Runner.get_run_dict', - return_value=test_run_dict): - self.runner.clean_flag = True - self.runner.run_test(mock_test) - args[0].assert_called_with('test_name') - args[1].assert_called_with('test_module') - self.assertEqual(self.runner.overall_result, - run_tests.Result.EX_OK) - - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - def test_run_tier_default(self, *mock_methods): - self.assertEqual(self.runner.run_tier(self.tier), - run_tests.Result.EX_OK) - mock_methods[0].assert_called_with(mock.ANY) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - def test_run_tier_missing_test(self, mock_logger_info): - self.tier.get_tests.return_value = None - self.assertEqual(self.runner.run_tier(self.tier), - run_tests.Result.EX_ERROR) - self.assertTrue(mock_logger_info.called) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - @mock.patch('functest.ci.run_tests.Runner.run_tier') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_run_all_default(self, *mock_methods): - os.environ['CI_LOOP'] = 'test_ci_loop' - self.runner.run_all() - mock_methods[1].assert_not_called() - self.assertTrue(mock_methods[2].called) - - @mock.patch('functest.ci.run_tests.LOGGER.info') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_run_all_missing_tier(self, *mock_methods): - os.environ['CI_LOOP'] = 'loop_re_not_available' - self.runner.run_all() - self.assertTrue(mock_methods[1].called) - - @mock.patch('functest.ci.run_tests.Runner.source_envfile', - side_effect=Exception) - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_failed(self, *mock_methods): - kwargs = {'test': 'test_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': False, - 'get_test.return_value': False} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_ERROR) - mock_methods[1].assert_called_once_with() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_tier(self, *mock_methods): - mock_tier = mock.Mock() - test_mock = mock.Mock() - test_mock.get_name.return_value = 'test1' - args = {'get_name.return_value': 'tier_name', - 'get_tests.return_value': [test_mock]} - mock_tier.configure_mock(**args) - kwargs = {'test': 'tier_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': mock_tier, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_OK) - mock_methods[1].assert_called() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_test', - return_value=TestCase.EX_OK) - def test_main_test(self, *mock_methods): - kwargs = {'test': 'test_name', 'noclean': True, 'report': True} - args = {'get_tier.return_value': None, - 'get_test.return_value': 'test_name'} - self.runner.tiers = mock.Mock() - mock_methods[1].return_value = self.creds - self.runner.tiers.configure_mock(**args) - self.assertEqual(self.runner.main(**kwargs), - run_tests.Result.EX_OK) - mock_methods[0].assert_called_once_with('test_name') - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - @mock.patch('functest.ci.run_tests.Runner.run_all') - @mock.patch('functest.ci.run_tests.Runner.summary') - def test_main_all_tier(self, *args): - kwargs = {'get_tier.return_value': None, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**kwargs) - self.assertEqual( - self.runner.main(test='all', noclean=True, report=True), - run_tests.Result.EX_OK) - args[0].assert_called_once_with(None) - args[1].assert_called_once_with() - args[2].assert_called_once_with() - - @mock.patch('functest.ci.run_tests.Runner.source_envfile') - def test_main_any_tier_test_ko(self, *args): - kwargs = {'get_tier.return_value': None, - 'get_test.return_value': None} - self.runner.tiers = mock.Mock() - self.runner.tiers.configure_mock(**kwargs) - self.assertEqual( - self.runner.main(test='any', noclean=True, report=True), - run_tests.Result.EX_ERROR) - args[0].assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/ci/test_tier_builder.py b/functest/tests/unit/ci/test_tier_builder.py deleted file mode 100644 index ef6a007b..00000000 --- a/functest/tests/unit/ci/test_tier_builder.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.ci import tier_builder - - -class TierBuilderTesting(unittest.TestCase): - - def setUp(self): - self.dependency = { - 'installer': 'test_installer', 'scenario': 'test_scenario'} - self.testcase = { - 'dependencies': self.dependency, 'enabled': 'true', - 'case_name': 'test_name', 'criteria': 'test_criteria', - 'blocking': 'test_blocking', 'description': 'test_desc', - 'project_name': 'project_name'} - self.dic_tier = { - 'name': 'test_tier', 'order': 'test_order', - 'ci_loop': 'test_ci_loop', 'description': 'test_desc', - 'testcases': [self.testcase]} - self.mock_yaml = mock.Mock() - attrs = {'get.return_value': [self.dic_tier]} - self.mock_yaml.configure_mock(**attrs) - - with mock.patch('functest.ci.tier_builder.yaml.safe_load', - return_value=self.mock_yaml), \ - mock.patch('six.moves.builtins.open', mock.mock_open()): - self.tierbuilder = tier_builder.TierBuilder( - 'test_installer', 'test_scenario', 'testcases_file') - self.tier_obj = self.tierbuilder.tier_objects[0] - - def test_get_tiers(self): - self.assertEqual(self.tierbuilder.get_tiers(), - [self.tier_obj]) - - def test_get_tier_names(self): - self.assertEqual(self.tierbuilder.get_tier_names(), - ['test_tier']) - - def test_get_tier_present_tier(self): - self.assertEqual(self.tierbuilder.get_tier('test_tier'), - self.tier_obj) - - def test_get_tier_missing_tier(self): - self.assertEqual(self.tierbuilder.get_tier('test_tier2'), - None) - - def test_get_test_present_test(self): - self.assertEqual(self.tierbuilder.get_test('test_name'), - self.tier_obj.get_test('test_name')) - - def test_get_test_missing_test(self): - self.assertEqual(self.tierbuilder.get_test('test_name2'), - None) - - def test_get_tests_present_tier(self): - self.assertEqual(self.tierbuilder.get_tests('test_tier'), - self.tier_obj.tests_array) - - def test_get_tests_missing_tier(self): - self.assertEqual(self.tierbuilder.get_tests('test_tier2'), - None) - - def test_get_tier_name_ok(self): - self.assertEqual(self.tierbuilder.get_tier_name('test_name'), - 'test_tier') - - def test_get_tier_name_ko(self): - self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None) - - def test_str(self): - message = str(self.tierbuilder) - self.assertTrue('test_tier' in message) - self.assertTrue('test_order' in message) - self.assertTrue('test_ci_loop' in message) - self.assertTrue('test_desc' in message) - self.assertTrue('test_name' in message) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py deleted file mode 100644 index 5e784128..00000000 --- a/functest/tests/unit/ci/test_tier_handler.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.ci import tier_handler - - -class TierHandlerTesting(unittest.TestCase): - # pylint: disable=too-many-public-methods - - def setUp(self): - self.test = mock.Mock() - attrs = {'get_name.return_value': 'test_name'} - self.test.configure_mock(**attrs) - self.mock_depend = mock.Mock() - attrs = {'get_scenario.return_value': 'test_scenario', - 'get_installer.return_value': 'test_installer'} - self.mock_depend.configure_mock(**attrs) - self.tier = tier_handler.Tier( - 'test_tier', 'test_order', 'test_ci_loop', description='test_desc') - self.testcase = tier_handler.TestCase( - 'test_name', 'true', self.mock_depend, 'test_criteria', - True, description='test_desc', project='project_name') - self.dependency = tier_handler.Dependency( - 'test_installer', 'test_scenario') - self.testcase.str = self.testcase.__str__() - self.dependency.str = self.dependency.__str__() - self.tier.str = self.tier.__str__() - - def test_split_text(self): - test_str = 'this is for testing' - self.assertEqual(tier_handler.split_text(test_str, 10), - ['this is ', 'for ', 'testing ']) - - def test_add_test(self): - self.tier.add_test(self.test) - self.assertEqual(self.tier.tests_array, [self.test]) - - def test_get_skipped_test1(self): - self.assertEqual(self.tier.get_skipped_test(), []) - - def test_get_skipped_test2(self): - self.tier.skip_test(self.test) - self.assertEqual(self.tier.get_skipped_test(), [self.test]) - - def test_get_tests(self): - self.tier.tests_array = [self.test] - self.assertEqual(self.tier.get_tests(), [self.test]) - - def test_get_test_names(self): - self.tier.tests_array = [self.test] - self.assertEqual(self.tier.get_test_names(), ['test_name']) - - def test_get_test(self): - self.tier.tests_array = [self.test] - with mock.patch.object(self.tier, 'is_test', return_value=True): - self.assertEqual(self.tier.get_test('test_name'), self.test) - - def test_get_test_missing_test(self): - self.tier.tests_array = [self.test] - with mock.patch.object(self.tier, 'is_test', return_value=False): - self.assertEqual(self.tier.get_test('test_name'), None) - - def test_get_name(self): - self.assertEqual(self.tier.get_name(), 'test_tier') - - def test_get_order(self): - self.assertEqual(self.tier.get_order(), 'test_order') - - def test_get_ci_loop(self): - self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') - - def test_testcase_is_none_in_item(self): - self.assertEqual(tier_handler.TestCase.is_none("item"), False) - - def test_testcase_is_none_no_item(self): - self.assertEqual(tier_handler.TestCase.is_none(None), True) - - def test_testcase_is_compatible(self): - self.assertEqual( - self.testcase.is_compatible('test_installer', 'test_scenario'), - True) - - def test_testcase_is_compatible_2(self): - self.assertEqual( - self.testcase.is_compatible('missing_installer', 'test_scenario'), - False) - self.assertEqual( - self.testcase.is_compatible('test_installer', 'missing_scenario'), - False) - - @mock.patch('re.search', side_effect=TypeError) - def test_testcase_is_compatible3(self, *args): - self.assertEqual( - self.testcase.is_compatible('test_installer', 'test_scenario'), - False) - args[0].assert_called_once_with('test_installer', 'test_installer') - - def test_testcase_get_name(self): - self.assertEqual(self.tier.get_name(), 'test_tier') - - def test_testcase_is_enabled(self): - self.assertEqual(self.testcase.is_enabled(), 'true') - - def test_testcase_get_criteria(self): - self.assertEqual(self.testcase.get_criteria(), 'test_criteria') - - def test_testcase_is_blocking(self): - self.assertTrue(self.testcase.is_blocking()) - - def test_testcase_get_project(self): - self.assertEqual(self.testcase.get_project(), 'project_name') - - def test_testcase_get_order(self): - self.assertEqual(self.tier.get_order(), 'test_order') - - def test_testcase_get_ci_loop(self): - self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') - - def test_dependency_get_installer(self): - self.assertEqual(self.dependency.get_installer(), 'test_installer') - - def test_dependency_get_scenario(self): - self.assertEqual(self.dependency.get_scenario(), 'test_scenario') - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/__init__.py b/functest/tests/unit/core/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/functest/tests/unit/core/test_feature.py b/functest/tests/unit/core/test_feature.py deleted file mode 100644 index 3219c726..00000000 --- a/functest/tests/unit/core/test_feature.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.core import feature -from functest.core import testcase - - -class FeatureTestingBase(unittest.TestCase): - - _case_name = "foo" - _project_name = "bar" - _repo = "dir_repo_bar" - _cmd = "run_bar_tests.py" - _output_file = '/home/opnfv/functest/results/foo.log' - feature = None - - @mock.patch('time.time', side_effect=[1, 2]) - def _test_run(self, status, mock_method=None): - self.assertEqual(self.feature.run(cmd=self._cmd), status) - if status == testcase.TestCase.EX_OK: - self.assertEqual(self.feature.result, 100) - else: - self.assertEqual(self.feature.result, 0) - mock_method.assert_has_calls([mock.call(), mock.call()]) - self.assertEqual(self.feature.start_time, 1) - self.assertEqual(self.feature.stop_time, 2) - - def test_logger_module_ko(self): - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name) - self.assertEqual(self.feature.logger.name, self._case_name) - - def test_logger_module(self): - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name, - run={'module': 'bar'}) - self.assertEqual(self.feature.logger.name, 'bar') - - -class FeatureTesting(FeatureTestingBase): - - def setUp(self): - # logging must be disabled else it calls time.time() - # what will break these unit tests. - logging.disable(logging.CRITICAL) - with mock.patch('six.moves.builtins.open'): - self.feature = feature.Feature( - project_name=self._project_name, case_name=self._case_name) - - def test_run_exc(self): - # pylint: disable=bad-continuation - with mock.patch.object( - self.feature, 'execute', - side_effect=Exception) as mock_method: - self._test_run(testcase.TestCase.EX_RUN_ERROR) - mock_method.assert_called_once_with(cmd=self._cmd) - - def test_run(self): - self._test_run(testcase.TestCase.EX_RUN_ERROR) - - -class BashFeatureTesting(FeatureTestingBase): - - def setUp(self): - # logging must be disabled else it calls time.time() - # what will break these unit tests. - logging.disable(logging.CRITICAL) - with mock.patch('six.moves.builtins.open'): - self.feature = feature.BashFeature( - project_name=self._project_name, case_name=self._case_name) - - @mock.patch('subprocess.Popen') - def test_run_no_cmd(self, mock_subproc): - self.assertEqual( - self.feature.run(), testcase.TestCase.EX_RUN_ERROR) - mock_subproc.assert_not_called() - - @mock.patch('subprocess.Popen') - def test_run_ko(self, mock_subproc): - with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: - mock_obj = mock.Mock() - attrs = {'wait.return_value': 1} - mock_obj.configure_mock(**attrs) - - mock_subproc.return_value = mock_obj - self._test_run(testcase.TestCase.EX_RUN_ERROR) - mopen.assert_called_once_with(self._output_file, "w+") - - @mock.patch('subprocess.Popen') - def test_run(self, mock_subproc): - with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: - mock_obj = mock.Mock() - attrs = {'wait.return_value': 0} - mock_obj.configure_mock(**attrs) - - mock_subproc.return_value = mock_obj - self._test_run(testcase.TestCase.EX_OK) - mopen.assert_called_once_with(self._output_file, "w+") - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_robotframework.py b/functest/tests/unit/core/test_robotframework.py deleted file mode 100644 index 28fd15f6..00000000 --- a/functest/tests/unit/core/test_robotframework.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2017 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the classes required to fully cover robot.""" - -import errno -import logging -import os -import unittest - -import mock -from robot.errors import DataError, RobotError -from robot.result import model -from robot.utils.robottime import timestamp_to_secs - -from functest.core import robotframework - -__author__ = "Cedric Ollivier " - - -class ResultVisitorTesting(unittest.TestCase): - - """The class testing ResultVisitor.""" - # pylint: disable=missing-docstring - - def setUp(self): - self.visitor = robotframework.ResultVisitor() - - def test_empty(self): - self.assertFalse(self.visitor.get_data()) - - def test_ok(self): - data = {'name': 'foo', - 'parent': 'bar', - 'status': 'PASS', - 'starttime': "20161216 16:00:00.000", - 'endtime': "20161216 16:00:01.000", - 'elapsedtime': 1000, - 'text': 'Hello, World!', - 'critical': True} - test = model.TestCase( - name=data['name'], status=data['status'], message=data['text'], - starttime=data['starttime'], endtime=data['endtime']) - test.parent = mock.Mock() - config = {'name': data['parent'], - 'criticality.test_is_critical.return_value': data[ - 'critical']} - test.parent.configure_mock(**config) - self.visitor.visit_test(test) - self.assertEqual(self.visitor.get_data(), [data]) - - -class ParseResultTesting(unittest.TestCase): - - """The class testing RobotFramework.parse_results().""" - # pylint: disable=missing-docstring - - _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000', - 'endtime': '20161216 16:00:01.000'} - - def setUp(self): - self.test = robotframework.RobotFramework( - case_name='robot', project_name='functest') - - @mock.patch('robot.api.ExecutionResult', side_effect=DataError) - def test_raises_exc(self, mock_method): - with self.assertRaises(DataError): - self.test.parse_results() - mock_method.assert_called_once_with( - os.path.join(self.test.res_dir, 'output.xml')) - - def _test_result(self, config, result): - suite = mock.Mock() - suite.configure_mock(**config) - with mock.patch('robot.api.ExecutionResult', - return_value=mock.Mock(suite=suite)): - self.test.parse_results() - self.assertEqual(self.test.result, result) - self.assertEqual(self.test.start_time, - timestamp_to_secs(config['starttime'])) - self.assertEqual(self.test.stop_time, - timestamp_to_secs(config['endtime'])) - self.assertEqual(self.test.details, - {'description': config['name'], 'tests': []}) - - def test_null_passed(self): - self._config.update({'statistics.critical.passed': 0, - 'statistics.critical.total': 20}) - self._test_result(self._config, 0) - - def test_no_test(self): - self._config.update({'statistics.critical.passed': 20, - 'statistics.critical.total': 0}) - self._test_result(self._config, 0) - - def test_half_success(self): - self._config.update({'statistics.critical.passed': 10, - 'statistics.critical.total': 20}) - self._test_result(self._config, 50) - - def test_success(self): - self._config.update({'statistics.critical.passed': 20, - 'statistics.critical.total': 20}) - self._test_result(self._config, 100) - - -class RunTesting(unittest.TestCase): - - """The class testing RobotFramework.run().""" - # pylint: disable=missing-docstring - - suites = ["foo"] - variable = [] - variablefile = [] - - def setUp(self): - self.test = robotframework.RobotFramework( - case_name='robot', project_name='functest') - - def test_exc_key_error(self): - self.assertEqual(self.test.run(), self.test.EX_RUN_ERROR) - - @mock.patch('robot.run') - def _test_makedirs_exc(self, *args): - with mock.patch.object(self.test, 'parse_results') as mock_method: - self.assertEqual( - self.test.run( - suites=self.suites, variable=self.variable, - variablefile=self.variablefile), - self.test.EX_RUN_ERROR) - args[0].assert_not_called() - mock_method.asser_not_called() - - @mock.patch('os.makedirs', side_effect=Exception) - def test_makedirs_exc(self, *args): - self._test_makedirs_exc() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('os.makedirs', side_effect=OSError) - def test_makedirs_oserror(self, *args): - self._test_makedirs_exc() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('robot.run') - def _test_makedirs(self, *args): - with mock.patch.object(self.test, 'parse_results') as mock_method: - self.assertEqual( - self.test.run(suites=self.suites, variable=self.variable), - self.test.EX_OK) - args[0].assert_called_once_with( - *self.suites, log='NONE', output=self.test.xml_file, - report='NONE', stdout=mock.ANY, variable=self.variable, - variablefile=self.variablefile) - mock_method.assert_called_once_with() - - @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, '')) - def test_makedirs_oserror17(self, *args): - self._test_makedirs() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('os.makedirs') - def test_makedirs(self, *args): - self._test_makedirs() - args[0].assert_called_once_with(self.test.res_dir) - - @mock.patch('robot.run') - def _test_parse_results(self, status, *args): - self.assertEqual( - self.test.run( - suites=self.suites, variable=self.variable, - variablefile=self.variablefile), - status) - args[0].assert_called_once_with( - *self.suites, log='NONE', output=self.test.xml_file, - report='NONE', stdout=mock.ANY, variable=self.variable, - variablefile=self.variablefile) - - def test_parse_results_exc(self): - with mock.patch.object(self.test, 'parse_results', - side_effect=Exception) as mock_method: - self._test_parse_results(self.test.EX_RUN_ERROR) - mock_method.assert_called_once_with() - - def test_parse_results_robot_error(self): - with mock.patch.object(self.test, 'parse_results', - side_effect=RobotError('foo')) as mock_method: - self._test_parse_results(self.test.EX_RUN_ERROR) - mock_method.assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/core/test_testcase.py b/functest/tests/unit/core/test_testcase.py deleted file mode 100644 index e11e5ff7..00000000 --- a/functest/tests/unit/core/test_testcase.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -"""Define the class required to fully cover testcase.""" - -from datetime import datetime -import json -import logging -import os -import unittest - -from functest.core import testcase - -import mock -import requests - - -__author__ = "Cedric Ollivier " - - -class TestCaseTesting(unittest.TestCase): - """The class testing TestCase.""" - - # pylint: disable=missing-docstring,too-many-public-methods - - _case_name = "base" - _project_name = "functest" - _published_result = "PASS" - _test_db_url = "http://testresults.opnfv.org/test/api/v1/results" - _headers = {'Content-Type': 'application/json'} - - def setUp(self): - self.test = testcase.TestCase(case_name=self._case_name, - project_name=self._project_name) - self.test.start_time = 1 - self.test.stop_time = 2 - self.test.result = 100 - self.test.details = {"Hello": "World"} - os.environ['TEST_DB_URL'] = TestCaseTesting._test_db_url - os.environ['INSTALLER_TYPE'] = "installer_type" - os.environ['DEPLOY_SCENARIO'] = "scenario" - os.environ['NODE_NAME'] = "node_name" - os.environ['BUILD_TAG'] = "foo-daily-master-bar" - - def test_run_unimplemented(self): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_RUN_ERROR) - - def _test_pushdb_missing_attribute(self): - self.assertEqual(self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - - def test_pushdb_no_project_name(self): - self.test.project_name = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_case_name(self): - self.test.case_name = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_start_time(self): - self.test.start_time = None - self._test_pushdb_missing_attribute() - - def test_pushdb_no_stop_time(self): - self.test.stop_time = None - self._test_pushdb_missing_attribute() - - def _test_pushdb_missing_env(self, var): - del os.environ[var] - self.assertEqual(self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - - def test_pushdb_no_db_url(self): - self._test_pushdb_missing_env('TEST_DB_URL') - - def test_pushdb_no_installer_type(self): - self._test_pushdb_missing_env('INSTALLER_TYPE') - - def test_pushdb_no_deploy_scenario(self): - self._test_pushdb_missing_env('DEPLOY_SCENARIO') - - def test_pushdb_no_node_name(self): - self._test_pushdb_missing_env('NODE_NAME') - - def test_pushdb_no_build_tag(self): - self._test_pushdb_missing_env('BUILD_TAG') - - @mock.patch('requests.post') - def test_pushdb_bad_start_time(self, mock_function=None): - self.test.start_time = "1" - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_not_called() - - @mock.patch('requests.post') - def test_pushdb_bad_end_time(self, mock_function=None): - self.test.stop_time = "2" - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_not_called() - - def _get_data(self): - return { - "build_tag": os.environ['BUILD_TAG'], - "case_name": self._case_name, - "criteria": 'PASS' if self.test.is_successful( - ) == self.test.EX_OK else 'FAIL', - "details": self.test.details, - "installer": os.environ['INSTALLER_TYPE'], - "pod_name": os.environ['NODE_NAME'], - "project_name": self.test.project_name, - "scenario": os.environ['DEPLOY_SCENARIO'], - "start_date": datetime.fromtimestamp( - self.test.start_time).strftime('%Y-%m-%d %H:%M:%S'), - "stop_date": datetime.fromtimestamp( - self.test.stop_time).strftime('%Y-%m-%d %H:%M:%S'), - "version": "master"} - - @mock.patch('requests.post') - def _test_pushdb_version(self, mock_function=None, **kwargs): - payload = self._get_data() - payload["version"] = kwargs.get("version", "unknown") - self.assertEqual(self.test.push_to_db(), testcase.TestCase.EX_OK) - mock_function.assert_called_once_with( - os.environ['TEST_DB_URL'], - data=json.dumps(payload, sort_keys=True), - headers=self._headers) - - def test_pushdb_daily_job(self): - self._test_pushdb_version(version="master") - - def test_pushdb_weekly_job(self): - os.environ['BUILD_TAG'] = 'foo-weekly-master-bar' - self._test_pushdb_version(version="master") - - def test_pushdb_random_build_tag(self): - os.environ['BUILD_TAG'] = 'whatever' - self._test_pushdb_version(version="unknown") - - @mock.patch('requests.post', return_value=mock.Mock( - raise_for_status=mock.Mock( - side_effect=requests.exceptions.HTTPError))) - def test_pushdb_http_errors(self, mock_function=None): - self.assertEqual( - self.test.push_to_db(), - testcase.TestCase.EX_PUSH_TO_DB_ERROR) - mock_function.assert_called_once_with( - os.environ['TEST_DB_URL'], - data=json.dumps(self._get_data(), sort_keys=True), - headers=self._headers) - - def test_check_criteria_missing(self): - self.test.criteria = None - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_missing(self): - self.test.result = None - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_failed(self): - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - self.test.result = 'FAIL' - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_pass(self): - # Backward compatibility - # It must be removed as soon as TestCase subclasses - # stop setting result = 'PASS' or 'FAIL'. - self.test.result = 'PASS' - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_lt(self): - self.test.result = 50 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_check_result_eq(self): - self.test.result = 100 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_gt(self): - self.test.criteria = 50 - self.test.result = 100 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_OK) - - def test_check_result_zero(self): - self.test.criteria = 0 - self.test.result = 0 - self.assertEqual(self.test.is_successful(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_get_duration_start_ko(self): - self.test.start_time = None - self.assertEqual(self.test.get_duration(), "XX:XX") - self.test.start_time = 0 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_duration_end_ko(self): - self.test.stop_time = None - self.assertEqual(self.test.get_duration(), "XX:XX") - self.test.stop_time = 0 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_invalid_duration(self): - self.test.start_time = 2 - self.test.stop_time = 1 - self.assertEqual(self.test.get_duration(), "XX:XX") - - def test_get_zero_duration(self): - self.test.start_time = 2 - self.test.stop_time = 2 - self.assertEqual(self.test.get_duration(), "00:00") - - def test_get_duration(self): - self.test.start_time = 1 - self.test.stop_time = 180 - self.assertEqual(self.test.get_duration(), "02:59") - - def test_str_project_name_ko(self): - self.test.project_name = None - self.assertIn("