From 2aab5c48df64b044ab9bae6e883e6e0acaabbf52 Mon Sep 17 00:00:00 2001 From: Cédric Ollivier Date: Wed, 28 Feb 2018 09:35:49 +0100 Subject: Rename all Functest refs to Xtesting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It mainly renames python modules and then the related documentation config files. Change-Id: I186010bb88d3d39afe7b8fd1ebcef9c690cc1282 Signed-off-by: Cédric Ollivier --- xtesting/__init__.py | 0 xtesting/ci/__init__.py | 0 xtesting/ci/logging.ini | 65 ++++ xtesting/ci/run_tests.py | 302 +++++++++++++++ xtesting/ci/testcases.yaml | 424 +++++++++++++++++++++ xtesting/ci/tier_builder.py | 106 ++++++ xtesting/ci/tier_handler.py | 174 +++++++++ xtesting/core/__init__.py | 0 xtesting/core/feature.py | 133 +++++++ xtesting/core/robotframework.py | 126 ++++++ xtesting/core/testcase.py | 227 +++++++++++ xtesting/core/unit.py | 92 +++++ xtesting/core/vnf.py | 205 ++++++++++ xtesting/energy/__init__.py | 0 xtesting/energy/energy.py | 334 ++++++++++++++++ xtesting/tests/__init__.py | 0 xtesting/tests/unit/__init__.py | 0 xtesting/tests/unit/ci/__init__.py | 0 xtesting/tests/unit/ci/test_run_tests.py | 267 +++++++++++++ xtesting/tests/unit/ci/test_tier_builder.py | 93 +++++ xtesting/tests/unit/ci/test_tier_handler.py | 139 +++++++ xtesting/tests/unit/core/__init__.py | 0 xtesting/tests/unit/core/test_feature.py | 117 ++++++ xtesting/tests/unit/core/test_robotframework.py | 199 ++++++++++ xtesting/tests/unit/core/test_testcase.py | 277 ++++++++++++++ xtesting/tests/unit/core/test_unit.py | 98 +++++ xtesting/tests/unit/core/test_vnf.py | 187 +++++++++ xtesting/tests/unit/energy/__init__.py | 0 xtesting/tests/unit/energy/test_functest_energy.py | 371 ++++++++++++++++++ xtesting/tests/unit/utils/__init__.py | 0 xtesting/tests/unit/utils/test_decorators.py | 125 ++++++ xtesting/tests/unit/utils/test_env.py | 57 +++ xtesting/utils/__init__.py | 0 xtesting/utils/constants.py | 5 + xtesting/utils/decorators.py | 57 +++ xtesting/utils/env.py | 44 +++ 36 files changed, 4224 insertions(+) create mode 100644 xtesting/__init__.py create mode 100644 xtesting/ci/__init__.py create mode 100644 xtesting/ci/logging.ini create mode 100644 xtesting/ci/run_tests.py create mode 100644 xtesting/ci/testcases.yaml create mode 100644 xtesting/ci/tier_builder.py create mode 100644 xtesting/ci/tier_handler.py create mode 100644 xtesting/core/__init__.py create mode 100644 xtesting/core/feature.py create mode 100644 xtesting/core/robotframework.py create mode 100644 xtesting/core/testcase.py create mode 100644 xtesting/core/unit.py create mode 100644 xtesting/core/vnf.py create mode 100644 xtesting/energy/__init__.py create mode 100644 xtesting/energy/energy.py create mode 100644 xtesting/tests/__init__.py create mode 100644 xtesting/tests/unit/__init__.py create mode 100644 xtesting/tests/unit/ci/__init__.py create mode 100644 xtesting/tests/unit/ci/test_run_tests.py create mode 100644 xtesting/tests/unit/ci/test_tier_builder.py create mode 100644 xtesting/tests/unit/ci/test_tier_handler.py create mode 100644 xtesting/tests/unit/core/__init__.py create mode 100644 xtesting/tests/unit/core/test_feature.py create mode 100644 xtesting/tests/unit/core/test_robotframework.py create mode 100644 xtesting/tests/unit/core/test_testcase.py create mode 100644 xtesting/tests/unit/core/test_unit.py create mode 100644 xtesting/tests/unit/core/test_vnf.py create mode 100644 xtesting/tests/unit/energy/__init__.py create mode 100644 xtesting/tests/unit/energy/test_functest_energy.py create mode 100644 xtesting/tests/unit/utils/__init__.py create mode 100644 xtesting/tests/unit/utils/test_decorators.py create mode 100644 xtesting/tests/unit/utils/test_env.py create mode 100644 xtesting/utils/__init__.py create mode 100644 xtesting/utils/constants.py create mode 100644 xtesting/utils/decorators.py create mode 100644 xtesting/utils/env.py (limited to 'xtesting') diff --git a/xtesting/__init__.py b/xtesting/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/ci/__init__.py b/xtesting/ci/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/ci/logging.ini b/xtesting/ci/logging.ini new file mode 100644 index 00000000..ab82073f --- /dev/null +++ b/xtesting/ci/logging.ini @@ -0,0 +1,65 @@ +[loggers] +keys=root,xtesting,api,ci,cli,core,energy,opnfv_tests,utils + +[handlers] +keys=console,wconsole,file,null + +[formatters] +keys=standard + +[logger_root] +level=NOTSET +handlers=null + +[logger_xtesting] +level=NOTSET +handlers=file +qualname=xtesting + +[logger_ci] +level=NOTSET +handlers=console +qualname=xtesting.ci + +[logger_core] +level=NOTSET +handlers=console +qualname=xtesting.core + +[logger_energy] +level=NOTSET +handlers=wconsole +qualname=xtesting.energy + +[logger_utils] +level=NOTSET +handlers=wconsole +qualname=xtesting.utils + +[handler_null] +class=NullHandler +level=NOTSET +formatter=standard +args=() + +[handler_console] +class=StreamHandler +level=INFO +formatter=standard +args=(sys.stdout,) + +[handler_wconsole] +class=StreamHandler +level=WARN +formatter=standard +args=(sys.stdout,) + +[handler_file] +class=FileHandler +level=DEBUG +formatter=standard +args=("/home/opnfv/xtesting/results/xtesting.log",) + +[formatter_standard] +format=%(asctime)s - %(name)s - %(levelname)s - %(message)s +datefmt= diff --git a/xtesting/ci/run_tests.py b/xtesting/ci/run_tests.py new file mode 100644 index 00000000..5c9143a3 --- /dev/null +++ b/xtesting/ci/run_tests.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Ericsson AB and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +""" The entry of running tests: +1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run +2) Execute the common operations on every testcase (run, push results to db...) +3) Return the right status code +""" + +import argparse +import importlib +import logging +import logging.config +import os +import re +import sys +import textwrap +import pkg_resources + +import enum +import prettytable +import yaml + +from xtesting.ci import tier_builder +from xtesting.core import testcase +from xtesting.utils import constants +from xtesting.utils import env + +LOGGER = logging.getLogger('xtesting.ci.run_tests') + + +class Result(enum.Enum): + """The overall result in enumerated type""" + # pylint: disable=too-few-public-methods + EX_OK = os.EX_OK + EX_ERROR = -1 + + +class BlockingTestFailed(Exception): + """Exception when the blocking test fails""" + pass + + +class TestNotEnabled(Exception): + """Exception when the test is not enabled""" + pass + + +class RunTestsParser(object): + """Parser to run tests""" + # pylint: disable=too-few-public-methods + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument("-t", "--test", dest="test", action='store', + help="Test case or tier (group of tests) " + "to be executed. It will run all the test " + "if not specified.") + self.parser.add_argument("-n", "--noclean", help="Do not clean " + "OpenStack resources after running each " + "test (default=false).", + action="store_true") + self.parser.add_argument("-r", "--report", help="Push results to " + "database (default=false).", + action="store_true") + + def parse_args(self, argv=None): + """Parse arguments. + + It can call sys.exit if arguments are incorrect. + + Returns: + the arguments from cmdline + """ + return vars(self.parser.parse_args(argv)) + + +class Runner(object): + """Runner class""" + + def __init__(self): + self.executed_test_cases = {} + self.overall_result = Result.EX_OK + self.clean_flag = True + self.report_flag = False + self.tiers = tier_builder.TierBuilder( + env.get('INSTALLER_TYPE'), + env.get('DEPLOY_SCENARIO'), + pkg_resources.resource_filename('xtesting', 'ci/testcases.yaml')) + + @staticmethod + def source_envfile(rc_file=constants.ENV_FILE): + """Source the env file passed as arg""" + if not os.path.isfile(rc_file): + LOGGER.debug("No env file %s found", rc_file) + return + with open(rc_file, "r") as rcfd: + for line in rcfd: + var = (line.rstrip('"\n').replace('export ', '').split( + "=") if re.search(r'(.*)=(.*)', line) else None) + # The two next lines should be modified as soon as rc_file + # conforms with common rules. Be aware that it could induce + # issues if value starts with ' + if var: + key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0]) + value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:])) + os.environ[key] = value + rcfd.seek(0, 0) + LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read()) + + @staticmethod + def get_dict_by_test(testname): + # pylint: disable=bad-continuation,missing-docstring + with open(pkg_resources.resource_filename( + 'xtesting', 'ci/testcases.yaml')) as tyaml: + testcases_yaml = yaml.safe_load(tyaml) + for dic_tier in testcases_yaml.get("tiers"): + for dic_testcase in dic_tier['testcases']: + if dic_testcase['case_name'] == testname: + return dic_testcase + LOGGER.error('Project %s is not defined in testcases.yaml', testname) + return None + + @staticmethod + def get_run_dict(testname): + """Obtain the 'run' block of the testcase from testcases.yaml""" + try: + dic_testcase = Runner.get_dict_by_test(testname) + if not dic_testcase: + LOGGER.error("Cannot get %s's config options", testname) + elif 'run' in dic_testcase: + return dic_testcase['run'] + return None + except Exception: # pylint: disable=broad-except + LOGGER.exception("Cannot get %s's config options", testname) + return None + + def run_test(self, test): + """Run one test case""" + if not test.is_enabled(): + raise TestNotEnabled( + "The test case {} is not enabled".format(test.get_name())) + LOGGER.info("Running test case '%s'...", test.get_name()) + result = testcase.TestCase.EX_RUN_ERROR + run_dict = self.get_run_dict(test.get_name()) + if run_dict: + try: + module = importlib.import_module(run_dict['module']) + cls = getattr(module, run_dict['class']) + test_dict = Runner.get_dict_by_test(test.get_name()) + test_case = cls(**test_dict) + self.executed_test_cases[test.get_name()] = test_case + try: + kwargs = run_dict['args'] + test_case.run(**kwargs) + except KeyError: + test_case.run() + if self.report_flag: + test_case.push_to_db() + if test.get_project() == "xtesting": + result = test_case.is_successful() + else: + result = testcase.TestCase.EX_OK + LOGGER.info("Test result:\n\n%s\n", test_case) + if self.clean_flag: + test_case.clean() + except ImportError: + LOGGER.exception("Cannot import module %s", run_dict['module']) + except AttributeError: + LOGGER.exception("Cannot get class %s", run_dict['class']) + else: + raise Exception("Cannot import the class for the test case.") + return result + + def run_tier(self, tier): + """Run one tier""" + tier_name = tier.get_name() + tests = tier.get_tests() + if not tests: + LOGGER.info("There are no supported test cases in this tier " + "for the given scenario") + self.overall_result = Result.EX_ERROR + else: + LOGGER.info("Running tier '%s'", tier_name) + for test in tests: + self.run_test(test) + test_case = self.executed_test_cases[test.get_name()] + if test_case.is_successful() != testcase.TestCase.EX_OK: + LOGGER.error("The test case '%s' failed.", test.get_name()) + if test.get_project() == "xtesting": + self.overall_result = Result.EX_ERROR + if test.is_blocking(): + raise BlockingTestFailed( + "The test case {} failed and is blocking".format( + test.get_name())) + return self.overall_result + + def run_all(self): + """Run all available testcases""" + tiers_to_run = [] + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['tiers', 'order', 'CI Loop', 'description', + 'testcases']) + for tier in self.tiers.get_tiers(): + ci_loop = env.get('CI_LOOP') + if (tier.get_tests() and + re.search(ci_loop, tier.get_ci_loop()) is not None): + tiers_to_run.append(tier) + msg.add_row([tier.get_name(), tier.get_order(), + tier.get_ci_loop(), + textwrap.fill(tier.description, width=40), + textwrap.fill(' '.join([str(x.get_name( + )) for x in tier.get_tests()]), width=40)]) + LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg) + for tier in tiers_to_run: + self.run_tier(tier) + + def main(self, **kwargs): + """Entry point of class Runner""" + if 'noclean' in kwargs: + self.clean_flag = not kwargs['noclean'] + if 'report' in kwargs: + self.report_flag = kwargs['report'] + try: + LOGGER.info("Deployment description:\n\n%s\n", env.string()) + self.source_envfile() + if 'test' in kwargs: + LOGGER.debug("Test args: %s", kwargs['test']) + if self.tiers.get_tier(kwargs['test']): + self.run_tier(self.tiers.get_tier(kwargs['test'])) + elif self.tiers.get_test(kwargs['test']): + result = self.run_test( + self.tiers.get_test(kwargs['test'])) + if result != testcase.TestCase.EX_OK: + LOGGER.error("The test case '%s' failed.", + kwargs['test']) + self.overall_result = Result.EX_ERROR + elif kwargs['test'] == "all": + self.run_all() + else: + LOGGER.error("Unknown test case or tier '%s', or not " + "supported by the given scenario '%s'.", + kwargs['test'], + env.get('DEPLOY_SCENARIO')) + LOGGER.debug("Available tiers are:\n\n%s", + self.tiers) + return Result.EX_ERROR + else: + self.run_all() + except BlockingTestFailed: + pass + except Exception: # pylint: disable=broad-except + LOGGER.exception("Failures when running testcase(s)") + self.overall_result = Result.EX_ERROR + if not self.tiers.get_test(kwargs['test']): + self.summary(self.tiers.get_tier(kwargs['test'])) + LOGGER.info("Execution exit value: %s", self.overall_result) + return self.overall_result + + def summary(self, tier=None): + """To generate xtesting report showing the overall results""" + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['test case', 'project', 'tier', + 'duration', 'result']) + tiers = [tier] if tier else self.tiers.get_tiers() + for each_tier in tiers: + for test in each_tier.get_tests(): + try: + test_case = self.executed_test_cases[test.get_name()] + except KeyError: + msg.add_row([test.get_name(), test.get_project(), + each_tier.get_name(), "00:00", "SKIP"]) + else: + result = 'PASS' if(test_case.is_successful( + ) == test_case.EX_OK) else 'FAIL' + msg.add_row( + [test_case.case_name, test_case.project_name, + self.tiers.get_tier_name(test_case.case_name), + test_case.get_duration(), result]) + for test in each_tier.get_skipped_test(): + msg.add_row([test.get_name(), test.get_project(), + each_tier.get_name(), "00:00", "SKIP"]) + LOGGER.info("Xtesting report:\n\n%s\n", msg) + + +def main(): + """Entry point""" + logging.config.fileConfig(pkg_resources.resource_filename( + 'xtesting', 'ci/logging.ini')) + logging.captureWarnings(True) + parser = RunTestsParser() + args = parser.parse_args(sys.argv[1:]) + runner = Runner() + return runner.main(**args).value diff --git a/xtesting/ci/testcases.yaml b/xtesting/ci/testcases.yaml new file mode 100644 index 00000000..c338f57e --- /dev/null +++ b/xtesting/ci/testcases.yaml @@ -0,0 +1,424 @@ +--- +tiers: + - + name: healthcheck + order: 0 + ci_loop: '(daily)|(weekly)' + description: >- + First tier to be executed to verify the basic + operations in the VIM. + testcases: + - + case_name: connection_check + project_name: xtesting + criteria: 100 + blocking: true + description: >- + This test case verifies the retrieval of OpenStack clients: + Keystone, Glance, Neutron and Nova and may perform some + simple queries. When the config value of + snaps.use_keystone is True, xtesting must have access to + the cloud's private network. + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '' + run: + module: + 'xtesting.opnfv_tests.openstack.snaps.connection_check' + class: 'ConnectionCheck' + + - + case_name: api_check + project_name: xtesting + criteria: 100 + blocking: true + description: >- + This test case verifies the retrieval of OpenStack clients: + Keystone, Glance, Neutron and Nova and may perform some + simple queries. When the config value of + snaps.use_keystone is True, xtesting must have access to + the cloud's private network. + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '^((?!lxd).)*$' + run: + module: 'xtesting.opnfv_tests.openstack.snaps.api_check' + class: 'ApiCheck' + + - + case_name: snaps_health_check + project_name: xtesting + criteria: 100 + blocking: true + description: >- + This test case creates executes the SimpleHealthCheck + Python test class which creates an, image, flavor, network, + and Cirros VM instance and observes the console output to + validate the single port obtains the correct IP address. + dependencies: + installer: '' + scenario: '^((?!lxd).)*$' + run: + module: 'xtesting.opnfv_tests.openstack.snaps.health_check' + class: 'HealthCheck' + + - + name: smoke + order: 1 + ci_loop: '(daily)|(weekly)' + description: >- + Set of basic Functional tests to validate the OPNFV scenarios. + testcases: + - + case_name: vping_ssh + project_name: xtesting + criteria: 100 + blocking: true + description: >- + This test case verifies: 1) SSH to an instance using + floating IPs over the public network. 2) Connectivity + between 2 instances over a private network. + dependencies: + installer: '' + scenario: '^((?!odl_l3|odl-bgpvpn|gluon).)*$' + run: + module: 'xtesting.opnfv_tests.openstack.vping.vping_ssh' + class: 'VPingSSH' + + - + case_name: vping_userdata + project_name: xtesting + criteria: 100 + blocking: true + description: >- + This test case verifies: 1) Boot a VM with given userdata. + 2) Connectivity between 2 instances over a private network. + dependencies: + installer: '' + scenario: '^((?!lxd).)*$' + run: + module: + 'xtesting.opnfv_tests.openstack.vping.vping_userdata' + class: 'VPingUserdata' + + - + case_name: tempest_smoke_serial + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case runs the smoke subset of the OpenStack + Tempest suite. The list of test cases is generated by + Tempest automatically and depends on the parameters of + the OpenStack deplopyment. + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '' + run: + module: 'xtesting.opnfv_tests.openstack.tempest.tempest' + class: 'TempestSmokeSerial' + + - + case_name: rally_sanity + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case runs a sub group of tests of the OpenStack + Rally suite in smoke mode. + dependencies: + installer: '' + scenario: '' + run: + module: 'xtesting.opnfv_tests.openstack.rally.rally' + class: 'RallySanity' + + - + case_name: refstack_defcore + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case runs a sub group of tests of the OpenStack + Defcore testcases by using refstack client. + dependencies: + installer: '' + scenario: '' + run: + module: + 'xtesting.opnfv_tests.openstack.refstack_client.refstack_client' + class: 'RefstackClient' + + - + case_name: odl + project_name: xtesting + criteria: 100 + blocking: false + description: >- + Test Suite for the OpenDaylight SDN Controller. It + integrates some test suites from upstream using + Robot as the test framework. + dependencies: + installer: '' + scenario: 'odl' + run: + module: 'xtesting.opnfv_tests.sdn.odl.odl' + class: 'ODLTests' + args: + suites: + - /src/odl_test/csit/suites/integration/basic + - /src/odl_test/csit/suites/openstack/neutron + + - + case_name: odl_netvirt + project_name: xtesting + criteria: 100 + blocking: false + description: >- + Test Suite for the OpenDaylight SDN Controller when + the NetVirt features are installed. It integrates + some test suites from upstream using Robot as the + test framework. + dependencies: + installer: 'apex' + scenario: 'os-odl_l3-nofeature' + run: + module: 'xtesting.opnfv_tests.sdn.odl.odl' + class: 'ODLTests' + args: + suites: + - /src/odl_test/csit/suites/integration/basic + - /src/odl_test/csit/suites/openstack/neutron + - /src/odl_test/csit/suites/openstack/connectivity + + - + case_name: snaps_smoke + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case contains tests that setup and destroy + environments with VMs with and without Floating IPs + with a newly created user and project. Set the config + value snaps.use_floating_ips (True|False) to toggle + this functionality. When the config value of + snaps.use_keystone is True, xtesting must have access to + the cloud's private network. + + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '^((?!lxd).)*$' + run: + module: 'xtesting.opnfv_tests.openstack.snaps.smoke' + class: 'SnapsSmoke' + + - + name: features + order: 2 + ci_loop: '(daily)|(weekly)' + description: >- + Test suites from feature projects + integrated in xtesting + testcases: + - + case_name: doctor-notification + project_name: doctor + criteria: 100 + blocking: false + description: >- + Test suite from Doctor project. + dependencies: + installer: 'apex' + scenario: '^((?!fdio).)*$' + run: + module: 'xtesting.core.feature' + class: 'BashFeature' + args: + cmd: 'doctor-test' + + - + case_name: bgpvpn + project_name: sdnvpn + criteria: 100 + blocking: false + description: >- + Test suite from SDNVPN project. + dependencies: + installer: '(fuel)|(apex)|(netvirt)' + scenario: 'bgpvpn' + run: + module: 'sdnvpn.test.xtesting.run_sdnvpn_tests' + class: 'SdnvpnFunctest' + + - + case_name: xtesting-odl-sfc + project_name: sfc + criteria: 100 + blocking: false + description: >- + Test suite for odl-sfc to test two chains with one SF and + one chain with two SFs + dependencies: + installer: '' + scenario: 'odl.*sfc' + run: + module: 'xtesting.core.feature' + class: 'BashFeature' + args: + cmd: 'run_sfc_tests.py' + + - + case_name: barometercollectd + project_name: barometer + criteria: 100 + blocking: false + description: >- + Test suite for the Barometer project. Separate tests verify + the proper configuration and basic functionality of all the + collectd plugins as described in the Project Release Plan + dependencies: + installer: 'apex' + scenario: 'bar' + run: + module: 'baro_tests.barometer' + class: 'BarometerCollectd' + + - + case_name: fds + project_name: fastdatastacks + criteria: 100 + blocking: false + description: >- + Test Suite for the OpenDaylight SDN Controller when GBP + features are installed. It integrates some test suites from + upstream using Robot as the test framework. + dependencies: + installer: 'apex' + scenario: 'odl.*-fdio' + run: + module: 'xtesting.opnfv_tests.sdn.odl.odl' + class: 'ODLTests' + args: + suites: + - /src/fds/testing/robot + + - + name: components + order: 3 + ci_loop: 'weekly' + description: >- + Extensive testing of OpenStack API. + testcases: + - + case_name: tempest_full_parallel + project_name: xtesting + criteria: 80 + blocking: false + description: >- + The list of test cases is generated by + Tempest automatically and depends on the parameters of + the OpenStack deplopyment. + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '' + run: + module: 'xtesting.opnfv_tests.openstack.tempest.tempest' + class: 'TempestFullParallel' + + - + case_name: rally_full + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case runs the full suite of scenarios of the + OpenStack Rally suite using several threads and iterations. + dependencies: + installer: '^((?!netvirt).)*$' + scenario: '' + run: + module: 'xtesting.opnfv_tests.openstack.rally.rally' + class: 'RallyFull' + + - + name: vnf + order: 4 + ci_loop: '(daily)|(weekly)' + description: >- + Collection of VNF test cases. + testcases: + - + case_name: cloudify_ims + project_name: xtesting + criteria: 80 + blocking: false + description: >- + This test case deploys an OpenSource vIMS solution from + Clearwater using the Cloudify orchestrator. It also runs + some signaling traffic. + dependencies: + installer: '' + scenario: 'os-nosdn-nofeature-.*ha' + run: + module: 'xtesting.opnfv_tests.vnf.ims.cloudify_ims' + class: 'CloudifyIms' + + - + case_name: vyos_vrouter + project_name: xtesting + criteria: 100 + blocking: false + description: >- + This test case is vRouter testing. + dependencies: + installer: '' + scenario: 'os-nosdn-nofeature-.*ha' + run: + module: 'xtesting.opnfv_tests.vnf.router.cloudify_vrouter' + class: 'CloudifyVrouter' + + - + case_name: orchestra_openims + project_name: orchestra + enabled: false + criteria: 100 + blocking: false + description: >- + OpenIMS VNF deployment with Open Baton (Orchestra) + dependencies: + installer: '' + scenario: 'os-nosdn-nofeature-.*ha' + run: + module: 'xtesting.opnfv_tests.vnf.ims.orchestra_openims' + class: 'OpenImsVnf' + + - + case_name: orchestra_clearwaterims + project_name: orchestra + enabled: false + criteria: 100 + blocking: false + description: >- + ClearwaterIMS VNF deployment with Open Baton (Orchestra) + dependencies: + installer: '' + scenario: 'os-nosdn-nofeature-.*ha' + run: + module: + 'xtesting.opnfv_tests.vnf.ims.orchestra_clearwaterims' + class: 'ClearwaterImsVnf' + + - + case_name: juju_epc + project_name: xtesting + criteria: 100 + blocking: false + description: >- + vEPC validation with Juju as VNF manager and ABoT as test + executor. + dependencies: + installer: '' + scenario: 'os-nosdn-nofeature-.*ha' + run: + module: 'xtesting.opnfv_tests.vnf.epc.juju_epc' + class: 'JujuEpc' diff --git a/xtesting/ci/tier_builder.py b/xtesting/ci/tier_builder.py new file mode 100644 index 00000000..2c7b0cab --- /dev/null +++ b/xtesting/ci/tier_builder.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Ericsson AB and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""TierBuilder class to parse testcases config file""" + +import yaml + +import xtesting.ci.tier_handler as th + + +class TierBuilder(object): + # pylint: disable=missing-docstring + + def __init__(self, ci_installer, ci_scenario, testcases_file): + self.ci_installer = ci_installer + self.ci_scenario = ci_scenario + self.testcases_file = testcases_file + self.dic_tier_array = None + self.tier_objects = [] + self.testcases_yaml = None + self.generate_tiers() + + def read_test_yaml(self): + with open(self.testcases_file) as tc_file: + self.testcases_yaml = yaml.safe_load(tc_file) + + self.dic_tier_array = [] + for tier in self.testcases_yaml.get("tiers"): + self.dic_tier_array.append(tier) + + def generate_tiers(self): + if self.dic_tier_array is None: + self.read_test_yaml() + + del self.tier_objects[:] + for dic_tier in self.dic_tier_array: + tier = th.Tier( + name=dic_tier['name'], order=dic_tier['order'], + ci_loop=dic_tier['ci_loop'], + description=dic_tier['description']) + + for dic_testcase in dic_tier['testcases']: + installer = dic_testcase['dependencies']['installer'] + scenario = dic_testcase['dependencies']['scenario'] + dep = th.Dependency(installer, scenario) + + testcase = th.TestCase( + name=dic_testcase['case_name'], + enabled=dic_testcase.get('enabled', True), + dependency=dep, criteria=dic_testcase['criteria'], + blocking=dic_testcase['blocking'], + description=dic_testcase['description'], + project=dic_testcase['project_name']) + if (testcase.is_compatible(self.ci_installer, + self.ci_scenario) and + testcase.is_enabled()): + tier.add_test(testcase) + else: + tier.skip_test(testcase) + + self.tier_objects.append(tier) + + def get_tiers(self): + return self.tier_objects + + def get_tier_names(self): + tier_names = [] + for tier in self.tier_objects: + tier_names.append(tier.get_name()) + return tier_names + + def get_tier(self, tier_name): + for i in range(0, len(self.tier_objects)): + if self.tier_objects[i].get_name() == tier_name: + return self.tier_objects[i] + return None + + def get_tier_name(self, test_name): + for i in range(0, len(self.tier_objects)): + if self.tier_objects[i].is_test(test_name): + return self.tier_objects[i].name + return None + + def get_test(self, test_name): + for i in range(0, len(self.tier_objects)): + if self.tier_objects[i].is_test(test_name): + return self.tier_objects[i].get_test(test_name) + return None + + def get_tests(self, tier_name): + for i in range(0, len(self.tier_objects)): + if self.tier_objects[i].get_name() == tier_name: + return self.tier_objects[i].get_tests() + return None + + def __str__(self): + output = "" + for i in range(0, len(self.tier_objects)): + output += str(self.tier_objects[i]) + "\n" + return output diff --git a/xtesting/ci/tier_handler.py b/xtesting/ci/tier_handler.py new file mode 100644 index 00000000..9fc3f24d --- /dev/null +++ b/xtesting/ci/tier_handler.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Ericsson AB and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Tier and TestCase classes to wrap the testcases config file""" +# pylint: disable=missing-docstring + +import re +import textwrap + +import prettytable + + +LINE_LENGTH = 72 + + +def split_text(text, max_len): + words = text.split() + lines = [] + line = "" + for word in words: + if len(line) + len(word) < max_len - 1: + line += word + " " + else: + lines.append(line) + line = word + " " + if line != "": + lines.append(line) + return lines + + +class Tier(object): + + def __init__(self, name, order, ci_loop, description=""): + self.tests_array = [] + self.skipped_tests_array = [] + self.name = name + self.order = order + self.ci_loop = ci_loop + self.description = description + + def add_test(self, testcase): + self.tests_array.append(testcase) + + def skip_test(self, testcase): + self.skipped_tests_array.append(testcase) + + def get_tests(self): + array_tests = [] + for test in self.tests_array: + array_tests.append(test) + return array_tests + + def get_skipped_test(self): + return self.skipped_tests_array + + def get_test_names(self): + array_tests = [] + for test in self.tests_array: + array_tests.append(test.get_name()) + return array_tests + + def get_test(self, test_name): + if self.is_test(test_name): + for test in self.tests_array: + if test.get_name() == test_name: + return test + return None + + def is_test(self, test_name): + for test in self.tests_array: + if test.get_name() == test_name: + return True + return False + + def get_name(self): + return self.name + + def get_order(self): + return self.order + + def get_ci_loop(self): + return self.ci_loop + + def __str__(self): + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['tiers', 'order', 'CI Loop', 'description', + 'testcases']) + msg.add_row( + [self.name, self.order, self.ci_loop, + textwrap.fill(self.description, width=40), + textwrap.fill(' '.join([str(x.get_name( + )) for x in self.get_tests()]), width=40)]) + return msg.get_string() + + +class TestCase(object): + + def __init__(self, name, enabled, dependency, criteria, blocking, + description="", project=""): + # pylint: disable=too-many-arguments + self.name = name + self.enabled = enabled + self.dependency = dependency + self.criteria = criteria + self.blocking = blocking + self.description = description + self.project = project + + @staticmethod + def is_none(item): + return item is None or item == "" + + def is_compatible(self, ci_installer, ci_scenario): + try: + if not self.is_none(ci_installer): + if re.search(self.dependency.get_installer(), + ci_installer) is None: + return False + if not self.is_none(ci_scenario): + if re.search(self.dependency.get_scenario(), + ci_scenario) is None: + return False + return True + except TypeError: + return False + + def get_name(self): + return self.name + + def is_enabled(self): + return self.enabled + + def get_criteria(self): + return self.criteria + + def is_blocking(self): + return self.blocking + + def get_project(self): + return self.project + + def __str__(self): + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['test case', 'description', 'criteria', 'dependency']) + msg.add_row([self.name, textwrap.fill(self.description, width=40), + self.criteria, self.dependency]) + return msg.get_string() + + +class Dependency(object): + + def __init__(self, installer, scenario): + self.installer = installer + self.scenario = scenario + + def get_installer(self): + return self.installer + + def get_scenario(self): + return self.scenario + + def __str__(self): + delimitator = "\n" if self.get_installer( + ) and self.get_scenario() else "" + return "{}{}{}".format(self.get_installer(), delimitator, + self.get_scenario()) diff --git a/xtesting/core/__init__.py b/xtesting/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/core/feature.py b/xtesting/core/feature.py new file mode 100644 index 00000000..d3f86c02 --- /dev/null +++ b/xtesting/core/feature.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 ZTE Corp and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the parent classes of all Xtesting Features. + +Feature is considered as TestCase offered by Third-party. It offers +helpers to run any python method or any bash command. +""" + +import logging +import subprocess +import time + +from xtesting.core import testcase + +__author__ = ("Serena Feng , " + "Cedric Ollivier ") + + +class Feature(testcase.TestCase): + """Base model for single feature.""" + + __logger = logging.getLogger(__name__) + dir_results = "/home/opnfv/xtesting/results" + + def __init__(self, **kwargs): + super(Feature, self).__init__(**kwargs) + self.result_file = "{}/{}.log".format(self.dir_results, self.case_name) + try: + module = kwargs['run']['module'] + self.logger = logging.getLogger(module) + except KeyError: + self.__logger.warning( + "Cannot get module name %s. Using %s as fallback", + kwargs, self.case_name) + self.logger = logging.getLogger(self.case_name) + handler = logging.StreamHandler() + handler.setLevel(logging.WARN) + self.logger.addHandler(handler) + handler = logging.FileHandler(self.result_file) + handler.setLevel(logging.DEBUG) + self.logger.addHandler(handler) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def execute(self, **kwargs): + """Execute the Python method. + + The subclasses must override the default implementation which + is false on purpose. + + The new implementation must return 0 if success or anything + else if failure. + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + -1. + """ + # pylint: disable=unused-argument,no-self-use + return -1 + + def run(self, **kwargs): + """Run the feature. + + It allows executing any Python method by calling execute(). + + It sets the following attributes required to push the results + to DB: + + * result, + * start_time, + * stop_time. + + It doesn't fulfill details when pushing the results to the DB. + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + TestCase.EX_OK if execute() returns 0, + TestCase.EX_RUN_ERROR otherwise. + """ + self.start_time = time.time() + exit_code = testcase.TestCase.EX_RUN_ERROR + self.result = 0 + try: + if self.execute(**kwargs) == 0: + exit_code = testcase.TestCase.EX_OK + self.result = 100 + except Exception: # pylint: disable=broad-except + self.__logger.exception("%s FAILED", self.project_name) + self.__logger.info("Test result is stored in '%s'", self.result_file) + self.stop_time = time.time() + return exit_code + + +class BashFeature(Feature): + """Class designed to run any bash command.""" + + __logger = logging.getLogger(__name__) + + def execute(self, **kwargs): + """Execute the cmd passed as arg + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + 0 if cmd returns 0, + -1 otherwise. + """ + ret = -1 + try: + cmd = kwargs["cmd"] + with open(self.result_file, 'w+') as f_stdout: + proc = subprocess.Popen(cmd.split(), stdout=f_stdout, + stderr=subprocess.STDOUT) + ret = proc.wait() + if ret != 0: + self.__logger.error("Execute command: %s failed", cmd) + except KeyError: + self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs) + return ret diff --git a/xtesting/core/robotframework.py b/xtesting/core/robotframework.py new file mode 100644 index 00000000..4d3746aa --- /dev/null +++ b/xtesting/core/robotframework.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +# Copyright (c) 2017 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define classes required to run any Robot suites.""" + +from __future__ import division + +import errno +import logging +import os + +import robot.api +from robot.errors import RobotError +import robot.run +from robot.utils.robottime import timestamp_to_secs +from six import StringIO + +from xtesting.core import testcase + +__author__ = "Cedric Ollivier " + + +class ResultVisitor(robot.api.ResultVisitor): + """Visitor to get result details.""" + + def __init__(self): + self._data = [] + + def visit_test(self, test): + output = {} + output['name'] = test.name + output['parent'] = test.parent.name + output['status'] = test.status + output['starttime'] = test.starttime + output['endtime'] = test.endtime + output['critical'] = test.critical + output['text'] = test.message + output['elapsedtime'] = test.elapsedtime + self._data.append(output) + + def get_data(self): + """Get the details of the result.""" + return self._data + + +class RobotFramework(testcase.TestCase): + """RobotFramework runner.""" + + __logger = logging.getLogger(__name__) + dir_results = "/home/opnfv/xtesting/results" + + def __init__(self, **kwargs): + self.res_dir = os.path.join(self.dir_results, 'robot') + self.xml_file = os.path.join(self.res_dir, 'output.xml') + super(RobotFramework, self).__init__(**kwargs) + + def parse_results(self): + """Parse output.xml and get the details in it.""" + result = robot.api.ExecutionResult(self.xml_file) + visitor = ResultVisitor() + result.visit(visitor) + try: + self.result = 100 * ( + result.suite.statistics.critical.passed / + result.suite.statistics.critical.total) + except ZeroDivisionError: + self.__logger.error("No test has been run") + self.start_time = timestamp_to_secs(result.suite.starttime) + self.stop_time = timestamp_to_secs(result.suite.endtime) + self.details = {} + self.details['description'] = result.suite.name + self.details['tests'] = visitor.get_data() + + def run(self, **kwargs): + """Run the RobotFramework suites + + Here are the steps: + * create the output directories if required, + * get the results in output.xml, + * delete temporary files. + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + EX_OK if all suites ran well. + EX_RUN_ERROR otherwise. + """ + try: + suites = kwargs["suites"] + variable = kwargs.get("variable", []) + variablefile = kwargs.get("variablefile", []) + except KeyError: + self.__logger.exception("Mandatory args were not passed") + return self.EX_RUN_ERROR + try: + os.makedirs(self.res_dir) + except OSError as ex: + if ex.errno != errno.EEXIST: + self.__logger.exception("Cannot create %s", self.res_dir) + return self.EX_RUN_ERROR + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot create %s", self.res_dir) + return self.EX_RUN_ERROR + stream = StringIO() + robot.run(*suites, variable=variable, variablefile=variablefile, + output=self.xml_file, log='NONE', + report='NONE', stdout=stream) + self.__logger.info("\n" + stream.getvalue()) + self.__logger.info("Results were successfully generated") + try: + self.parse_results() + self.__logger.info("Results were successfully parsed") + except RobotError as ex: + self.__logger.error("Run suites before publishing: %s", ex.message) + return self.EX_RUN_ERROR + except Exception: # pylint: disable=broad-except + self.__logger.exception("Cannot parse results") + return self.EX_RUN_ERROR + return self.EX_OK diff --git a/xtesting/core/testcase.py b/xtesting/core/testcase.py new file mode 100644 index 00000000..4effa932 --- /dev/null +++ b/xtesting/core/testcase.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the parent class of all Xtesting TestCases.""" + +from datetime import datetime +import json +import logging +import os +import re +import requests + +from xtesting.utils import decorators +from xtesting.utils import env + + +import prettytable + + +__author__ = "Cedric Ollivier " + + +class TestCase(object): + """Base model for single test case.""" + + EX_OK = os.EX_OK + """everything is OK""" + + EX_RUN_ERROR = os.EX_SOFTWARE + """run() failed""" + + EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1 + """push_to_db() failed""" + + EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2 + """results are false""" + + _job_name_rule = "(dai|week)ly-(.+?)-[0-9]*" + _headers = {'Content-Type': 'application/json'} + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + self.details = {} + self.project_name = kwargs.get('project_name', 'xtesting') + self.case_name = kwargs.get('case_name', '') + self.criteria = kwargs.get('criteria', 100) + self.result = 0 + self.start_time = 0 + self.stop_time = 0 + + def __str__(self): + try: + assert self.project_name + assert self.case_name + result = 'PASS' if(self.is_successful( + ) == TestCase.EX_OK) else 'FAIL' + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['test case', 'project', 'duration', + 'result']) + msg.add_row([self.case_name, self.project_name, + self.get_duration(), result]) + return msg.get_string() + except AssertionError: + self.__logger.error("We cannot print invalid objects") + return super(TestCase, self).__str__() + + def get_duration(self): + """Return the duration of the test case. + + Returns: + duration if start_time and stop_time are set + "XX:XX" otherwise. + """ + try: + assert self.start_time + assert self.stop_time + if self.stop_time < self.start_time: + return "XX:XX" + return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod( + self.stop_time - self.start_time, 60)) + except Exception: # pylint: disable=broad-except + self.__logger.error("Please run test before getting the duration") + return "XX:XX" + + def is_successful(self): + """Interpret the result of the test case. + + It allows getting the result of TestCase. It completes run() + which only returns the execution status. + + It can be overriden if checking result is not suitable. + + Returns: + TestCase.EX_OK if result is 'PASS'. + TestCase.EX_TESTCASE_FAILED otherwise. + """ + try: + assert self.criteria + assert self.result is not None + if (not isinstance(self.result, str) and + not isinstance(self.criteria, str)): + if self.result >= self.criteria: + return TestCase.EX_OK + else: + # Backward compatibility + # It must be removed as soon as TestCase subclasses + # stop setting result = 'PASS' or 'FAIL'. + # In this case criteria is unread. + self.__logger.warning( + "Please update result which must be an int!") + if self.result == 'PASS': + return TestCase.EX_OK + except AssertionError: + self.__logger.error("Please run test before checking the results") + return TestCase.EX_TESTCASE_FAILED + + def run(self, **kwargs): + """Run the test case. + + It allows running TestCase and getting its execution + status. + + The subclasses must override the default implementation which + is false on purpose. + + The new implementation must set the following attributes to + push the results to DB: + + * result, + * start_time, + * stop_time. + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + TestCase.EX_RUN_ERROR. + """ + # pylint: disable=unused-argument + self.__logger.error("Run must be implemented") + return TestCase.EX_RUN_ERROR + + @decorators.can_dump_request_to_file + def push_to_db(self): + """Push the results of the test case to the DB. + + It allows publishing the results and checking the status. + + It could be overriden if the common implementation is not + suitable. + + The following attributes must be set before pushing the results to DB: + + * project_name, + * case_name, + * result, + * start_time, + * stop_time. + + The next vars must be set in env: + + * TEST_DB_URL, + * INSTALLER_TYPE, + * DEPLOY_SCENARIO, + * NODE_NAME, + * BUILD_TAG. + + Returns: + TestCase.EX_OK if results were pushed to DB. + TestCase.EX_PUSH_TO_DB_ERROR otherwise. + """ + try: + assert self.project_name + assert self.case_name + assert self.start_time + assert self.stop_time + url = env.get('TEST_DB_URL') + data = {"project_name": self.project_name, + "case_name": self.case_name, + "details": self.details} + data["installer"] = env.get('INSTALLER_TYPE') + data["scenario"] = env.get('DEPLOY_SCENARIO') + data["pod_name"] = env.get('NODE_NAME') + data["build_tag"] = env.get('BUILD_TAG') + data["criteria"] = 'PASS' if self.is_successful( + ) == TestCase.EX_OK else 'FAIL' + data["start_date"] = datetime.fromtimestamp( + self.start_time).strftime('%Y-%m-%d %H:%M:%S') + data["stop_date"] = datetime.fromtimestamp( + self.stop_time).strftime('%Y-%m-%d %H:%M:%S') + try: + data["version"] = re.search( + TestCase._job_name_rule, + env.get('BUILD_TAG')).group(2) + except Exception: # pylint: disable=broad-except + data["version"] = "unknown" + req = requests.post( + url, data=json.dumps(data, sort_keys=True), + headers=self._headers) + req.raise_for_status() + self.__logger.info( + "The results were successfully pushed to DB %s", url) + except AssertionError: + self.__logger.exception( + "Please run test before publishing the results") + return TestCase.EX_PUSH_TO_DB_ERROR + except requests.exceptions.HTTPError: + self.__logger.exception("The HTTP request raises issues") + return TestCase.EX_PUSH_TO_DB_ERROR + except Exception: # pylint: disable=broad-except + self.__logger.exception("The results cannot be pushed to DB") + return TestCase.EX_PUSH_TO_DB_ERROR + return TestCase.EX_OK + + def clean(self): + """Clean the resources. + + It can be overriden if resources must be deleted after + running the test case. + """ diff --git a/xtesting/core/unit.py b/xtesting/core/unit.py new file mode 100644 index 00000000..27773679 --- /dev/null +++ b/xtesting/core/unit.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Cable Television Laboratories, Inc. and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the parent class to run unittest.TestSuite as TestCase.""" + +from __future__ import division + +import logging +import time +import unittest + +import six + +from xtesting.core import testcase + +__author__ = ("Steven Pisarski , " + "Cedric Ollivier ") + + +class Suite(testcase.TestCase): + """Base model for running unittest.TestSuite.""" + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + super(Suite, self).__init__(**kwargs) + self.suite = None + + def run(self, **kwargs): + """Run the test suite. + + It allows running any unittest.TestSuite and getting its + execution status. + + By default, it runs the suite defined as instance attribute. + It can be overriden by passing name as arg. It must + conform with TestLoader.loadTestsFromName(). + + It sets the following attributes required to push the results + to DB: + + * result, + * start_time, + * stop_time, + * details. + + Args: + kwargs: Arbitrary keyword arguments. + + Returns: + TestCase.EX_OK if any TestSuite has been run, + TestCase.EX_RUN_ERROR otherwise. + """ + try: + name = kwargs["name"] + try: + self.suite = unittest.TestLoader().loadTestsFromName(name) + except ImportError: + self.__logger.error("Can not import %s", name) + return testcase.TestCase.EX_RUN_ERROR + except KeyError: + pass + try: + assert self.suite + self.start_time = time.time() + stream = six.StringIO() + result = unittest.TextTestRunner( + stream=stream, verbosity=2).run(self.suite) + self.__logger.debug("\n\n%s", stream.getvalue()) + self.stop_time = time.time() + self.details = { + "testsRun": result.testsRun, + "failures": len(result.failures), + "errors": len(result.errors), + "stream": stream.getvalue()} + self.result = 100 * ( + (result.testsRun - (len(result.failures) + + len(result.errors))) / + result.testsRun) + return testcase.TestCase.EX_OK + except AssertionError: + self.__logger.error("No suite is defined") + return testcase.TestCase.EX_RUN_ERROR + except ZeroDivisionError: + self.__logger.error("No test has been run") + return testcase.TestCase.EX_RUN_ERROR diff --git a/xtesting/core/vnf.py b/xtesting/core/vnf.py new file mode 100644 index 00000000..95ebde04 --- /dev/null +++ b/xtesting/core/vnf.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the parent class of all VNF TestCases.""" + +import logging +import time +import uuid + +from snaps.config.user import UserConfig +from snaps.config.project import ProjectConfig +from snaps.openstack.create_user import OpenStackUser +from snaps.openstack.create_project import OpenStackProject +from snaps.openstack.tests import openstack_tests + +from xtesting.core import testcase +from xtesting.utils import constants + +__author__ = ("Morgan Richomme , " + "Valentin Boucher ") + + +class VnfPreparationException(Exception): + """Raise when VNF preparation cannot be executed.""" + + +class OrchestratorDeploymentException(Exception): + """Raise when orchestrator cannot be deployed.""" + + +class VnfDeploymentException(Exception): + """Raise when VNF cannot be deployed.""" + + +class VnfTestException(Exception): + """Raise when VNF cannot be tested.""" + + +class VnfOnBoarding(testcase.TestCase): + # pylint: disable=too-many-instance-attributes + """Base model for VNF test cases.""" + + __logger = logging.getLogger(__name__) + + def __init__(self, **kwargs): + super(VnfOnBoarding, self).__init__(**kwargs) + self.uuid = uuid.uuid4() + self.user_name = "{}-{}".format(self.case_name, self.uuid) + self.tenant_name = "{}-{}".format(self.case_name, self.uuid) + self.snaps_creds = {} + self.created_object = [] + self.os_project = None + self.tenant_description = "Created by OPNFV Functest: {}".format( + self.case_name) + + def run(self, **kwargs): + """ + Run of the VNF test case: + + * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...), + * Deploy the VNF, + * Perform tests on the VNF + + A VNF test case is successfull when the 3 steps are PASS + If one of the step is FAIL, the test case is FAIL + + Returns: + TestCase.EX_OK if result is 'PASS'. + TestCase.EX_TESTCASE_FAILED otherwise. + """ + self.start_time = time.time() + + try: + self.prepare() + if (self.deploy_orchestrator() and + self.deploy_vnf() and + self.test_vnf()): + self.stop_time = time.time() + # Calculation with different weight depending on the steps TODO + self.result = 100 + return testcase.TestCase.EX_OK + self.result = 0 + self.stop_time = time.time() + return testcase.TestCase.EX_TESTCASE_FAILED + except Exception: # pylint: disable=broad-except + self.stop_time = time.time() + self.__logger.exception("Exception on VNF testing") + return testcase.TestCase.EX_TESTCASE_FAILED + + def prepare(self): + """ + Prepare the environment for VNF testing: + + * Creation of a user, + * Creation of a tenant, + * Allocation admin role to the user on this tenant + + Returns base.TestCase.EX_OK if preparation is successfull + + Raise VnfPreparationException in case of problem + """ + try: + self.__logger.info( + "Prepare VNF: %s, description: %s", self.case_name, + self.tenant_description) + snaps_creds = openstack_tests.get_credentials( + os_env_file=constants.ENV_FILE) + + self.os_project = OpenStackProject( + snaps_creds, + ProjectConfig( + name=self.tenant_name, + description=self.tenant_description + )) + self.os_project.create() + self.created_object.append(self.os_project) + user_creator = OpenStackUser( + snaps_creds, + UserConfig( + name=self.user_name, + password=str(uuid.uuid4()), + roles={'admin': self.tenant_name})) + user_creator.create() + self.created_object.append(user_creator) + self.snaps_creds = user_creator.get_os_creds(self.tenant_name) + + return testcase.TestCase.EX_OK + except Exception: # pylint: disable=broad-except + self.__logger.exception("Exception raised during VNF preparation") + raise VnfPreparationException + + def deploy_orchestrator(self): + """ + Deploy an orchestrator (optional). + + If this method is overriden then raise orchestratorDeploymentException + if error during orchestrator deployment + """ + self.__logger.info("Deploy orchestrator (if necessary)") + return True + + def deploy_vnf(self): + """ + Deploy the VNF + + This function MUST be implemented by vnf test cases. + The details section MAY be updated in the vnf test cases. + + The deployment can be executed via a specific orchestrator + or using build-in orchestrators such as heat, OpenBaton, cloudify, + juju, onap, ... + + Returns: + True if the VNF is properly deployed + False if the VNF is not deployed + + Raise VnfDeploymentException if error during VNF deployment + """ + self.__logger.error("VNF must be deployed") + raise VnfDeploymentException + + def test_vnf(self): + """ + Test the VNF + + This function MUST be implemented by vnf test cases. + The details section MAY be updated in the vnf test cases. + + Once a VNF is deployed, it is assumed that specific test suite can be + run to validate the VNF. + Please note that the same test suite can be used on several test case + (e.g. clearwater test suite can be used whatever the orchestrator used + for the deployment) + + Returns: + True if VNF tests are PASS + False if test suite is FAIL + + Raise VnfTestException if error during VNF test + """ + self.__logger.error("VNF must be tested") + raise VnfTestException + + def clean(self): + """ + Clean VNF test case. + + It is up to the test providers to delete resources used for the tests. + By default we clean: + + * the user, + * the tenant + """ + self.__logger.info('Removing the VNF resources ..') + for creator in reversed(self.created_object): + try: + creator.clean() + except Exception as exc: # pylint: disable=broad-except + self.__logger.error('Unexpected error cleaning - %s', exc) diff --git a/xtesting/energy/__init__.py b/xtesting/energy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/energy/energy.py b/xtesting/energy/energy.py new file mode 100644 index 00000000..76e4873c --- /dev/null +++ b/xtesting/energy/energy.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +# Copyright (c) 2017 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""This module manages calls to Energy recording API.""" + +import json +import logging +import traceback + +from functools import wraps +import requests +from six.moves import urllib + +from xtesting.utils import env + + +def finish_session(current_scenario): + """Finish a recording session.""" + if current_scenario is None: + EnergyRecorder.stop() + else: + EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)", + current_scenario["scenario"], + current_scenario["step"]) + EnergyRecorder.submit_scenario( + current_scenario["scenario"], + current_scenario["step"] + ) + + +def enable_recording(method): + """ + Record energy during method execution. + + Decorator to record energy during "method" exection. + + param method: Method to suround with start and stop + :type method: function + + .. note:: "method" should belong to a class having a "case_name" + attribute + """ + @wraps(method) + def wrapper(*args): + """ + Record energy during method execution (implementation). + + Wrapper for decorator to handle method arguments. + """ + current_scenario = EnergyRecorder.get_current_scenario() + EnergyRecorder.start(args[0].case_name) + try: + return_value = method(*args) + finish_session(current_scenario) + except Exception as exc: # pylint: disable=broad-except + EnergyRecorder.logger.exception(exc) + finish_session(current_scenario) + raise exc + return return_value + return wrapper + + +# Class to manage energy recording sessions +class EnergyRecorder(object): + """Manage Energy recording session.""" + + logger = logging.getLogger(__name__) + # Energy recording API connectivity settings + # see load_config method + energy_recorder_api = None + + # Default initial step + INITIAL_STEP = "running" + + # Default connection timeout + CONNECTION_TIMEOUT = 4 + + @staticmethod + def load_config(): + """ + Load connectivity settings from yaml. + + Load connectivity settings to Energy recording API + """ + # Singleton pattern for energy_recorder_api static member + # Load only if not previouly done + if EnergyRecorder.energy_recorder_api is None: + assert env.get('NODE_NAME') + assert env.get('ENERGY_RECORDER_API_URL') + environment = env.get('NODE_NAME') + energy_recorder_uri = env.get( + 'ENERGY_RECORDER_API_URL') + + # Creds + creds_usr = env.get("ENERGY_RECORDER_API_USER") + creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD") + + uri_comp = "/recorders/environment/" + uri_comp += urllib.parse.quote_plus(environment) + + if creds_usr and creds_pass: + energy_recorder_api_auth = (creds_usr, creds_pass) + else: + energy_recorder_api_auth = None + + try: + resp = requests.get(energy_recorder_uri + "/monitoring/ping", + auth=energy_recorder_api_auth, + headers={ + 'content-type': 'application/json' + }, + timeout=EnergyRecorder.CONNECTION_TIMEOUT) + api_available = json.loads(resp.text)["status"] == "OK" + EnergyRecorder.logger.info( + "API recorder available at : %s", + energy_recorder_uri + uri_comp) + except Exception as exc: # pylint: disable=broad-except + EnergyRecorder.logger.info( + "Energy recorder API is not available, cause=%s", + str(exc)) + api_available = False + # Final config + EnergyRecorder.energy_recorder_api = { + "uri": energy_recorder_uri + uri_comp, + "auth": energy_recorder_api_auth, + "available": api_available + } + return EnergyRecorder.energy_recorder_api["available"] + + @staticmethod + def submit_scenario(scenario, step): + """ + Submit a complet scenario definition to Energy recorder API. + + param scenario: Scenario name + :type scenario: string + param step: Step name + :type step: string + """ + try: + return_status = True + # Ensure that connectyvity settings are loaded + if EnergyRecorder.load_config(): + EnergyRecorder.logger.debug("Submitting scenario (%s/%s)", + scenario, step) + + # Create API payload + payload = { + "step": step, + "scenario": scenario + } + # Call API to start energy recording + response = requests.post( + EnergyRecorder.energy_recorder_api["uri"], + data=json.dumps(payload), + auth=EnergyRecorder.energy_recorder_api["auth"], + headers={ + 'content-type': 'application/json' + }, + timeout=EnergyRecorder.CONNECTION_TIMEOUT + ) + if response.status_code != 200: + EnergyRecorder.logger.error( + "Error while submitting scenario\n%s", + response.text) + return_status = False + except requests.exceptions.ConnectionError: + EnergyRecorder.logger.warning( + "submit_scenario: Unable to connect energy recorder API") + return_status = False + except Exception: # pylint: disable=broad-except + # Default exception handler to ensure that method + # is safe for caller + EnergyRecorder.logger.info( + "Error while submitting scenarion to energy recorder API\n%s", + traceback.format_exc() + ) + return_status = False + return return_status + + @staticmethod + def start(scenario): + """ + Start a recording session for scenario. + + param scenario: Starting scenario + :type scenario: string + """ + return_status = True + try: + if EnergyRecorder.load_config(): + EnergyRecorder.logger.debug("Starting recording") + return_status = EnergyRecorder.submit_scenario( + scenario, + EnergyRecorder.INITIAL_STEP + ) + + except Exception: # pylint: disable=broad-except + # Default exception handler to ensure that method + # is safe for caller + EnergyRecorder.logger.info( + "Error while starting energy recorder API\n%s", + traceback.format_exc() + ) + return_status = False + return return_status + + @staticmethod + def stop(): + """Stop current recording session.""" + return_status = True + try: + # Ensure that connectyvity settings are loaded + if EnergyRecorder.load_config(): + EnergyRecorder.logger.debug("Stopping recording") + + # Call API to stop energy recording + response = requests.delete( + EnergyRecorder.energy_recorder_api["uri"], + auth=EnergyRecorder.energy_recorder_api["auth"], + headers={ + 'content-type': 'application/json' + }, + timeout=EnergyRecorder.CONNECTION_TIMEOUT + ) + if response.status_code != 200: + EnergyRecorder.logger.error( + "Error while stating energy recording session\n%s", + response.text) + return_status = False + except requests.exceptions.ConnectionError: + EnergyRecorder.logger.warning( + "stop: Unable to connect energy recorder API") + return_status = False + except Exception: # pylint: disable=broad-except + # Default exception handler to ensure that method + # is safe for caller + EnergyRecorder.logger.info( + "Error while stoping energy recorder API\n%s", + traceback.format_exc() + ) + return_status = False + return return_status + + @staticmethod + def set_step(step): + """Notify energy recording service of current step of the testcase.""" + return_status = True + try: + # Ensure that connectyvity settings are loaded + if EnergyRecorder.load_config(): + EnergyRecorder.logger.debug("Setting step") + + # Create API payload + payload = { + "step": step, + } + + # Call API to define step + response = requests.post( + EnergyRecorder.energy_recorder_api["uri"] + "/step", + data=json.dumps(payload), + auth=EnergyRecorder.energy_recorder_api["auth"], + headers={ + 'content-type': 'application/json' + }, + timeout=EnergyRecorder.CONNECTION_TIMEOUT + ) + if response.status_code != 200: + EnergyRecorder.logger.error( + "Error while setting current step of testcase\n%s", + response.text) + return_status = False + except requests.exceptions.ConnectionError: + EnergyRecorder.logger.warning( + "set_step: Unable to connect energy recorder API") + return_status = False + except Exception: # pylint: disable=broad-except + # Default exception handler to ensure that method + # is safe for caller + EnergyRecorder.logger.info( + "Error while setting step on energy recorder API\n%s", + traceback.format_exc() + ) + return_status = False + return return_status + + @staticmethod + def get_current_scenario(): + """Get current running scenario (if any, None else).""" + return_value = None + try: + # Ensure that connectyvity settings are loaded + if EnergyRecorder.load_config(): + EnergyRecorder.logger.debug("Getting current scenario") + + # Call API get running scenario + response = requests.get( + EnergyRecorder.energy_recorder_api["uri"], + auth=EnergyRecorder.energy_recorder_api["auth"], + timeout=EnergyRecorder.CONNECTION_TIMEOUT + ) + if response.status_code == 200: + return_value = json.loads(response.text) + elif response.status_code == 404: + EnergyRecorder.logger.info( + "No current running scenario at %s", + EnergyRecorder.energy_recorder_api["uri"]) + return_value = None + else: + EnergyRecorder.logger.error( + "Error while getting current scenario\n%s", + response.text) + return_value = None + except requests.exceptions.ConnectionError: + EnergyRecorder.logger.warning( + "get_currernt_sceario: Unable to connect energy recorder API") + return_value = None + except Exception: # pylint: disable=broad-except + # Default exception handler to ensure that method + # is safe for caller + EnergyRecorder.logger.info( + "Error while getting current scenario from energy recorder API" + "\n%s", traceback.format_exc() + ) + return_value = None + return return_value diff --git a/xtesting/tests/__init__.py b/xtesting/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/tests/unit/__init__.py b/xtesting/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/tests/unit/ci/__init__.py b/xtesting/tests/unit/ci/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/tests/unit/ci/test_run_tests.py b/xtesting/tests/unit/ci/test_run_tests.py new file mode 100644 index 00000000..de2af66d --- /dev/null +++ b/xtesting/tests/unit/ci/test_run_tests.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python + +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +# pylint: disable=missing-docstring + +import logging +import unittest +import os + +import mock + +from xtesting.ci import run_tests +from xtesting.core.testcase import TestCase + + +class FakeModule(TestCase): + + def run(self, **kwargs): + return TestCase.EX_OK + + +class RunTestsTesting(unittest.TestCase): + + def setUp(self): + self.runner = run_tests.Runner() + mock_test_case = mock.Mock() + mock_test_case.is_successful.return_value = TestCase.EX_OK + self.runner.executed_test_cases['test1'] = mock_test_case + self.runner.executed_test_cases['test2'] = mock_test_case + self.sep = 'test_sep' + self.creds = {'OS_AUTH_URL': 'http://test_ip:test_port/v2.0', + 'OS_USERNAME': 'test_os_username', + 'OS_TENANT_NAME': 'test_tenant', + 'OS_PASSWORD': 'test_password'} + self.test = {'test_name': 'test_name'} + self.tier = mock.Mock() + test1 = mock.Mock() + test1.get_name.return_value = 'test1' + test2 = mock.Mock() + test2.get_name.return_value = 'test2' + attrs = {'get_name.return_value': 'test_tier', + 'get_tests.return_value': [test1, test2], + 'get_ci_loop.return_value': 'test_ci_loop', + 'get_test_names.return_value': ['test1', 'test2']} + self.tier.configure_mock(**attrs) + + self.tiers = mock.Mock() + attrs = {'get_tiers.return_value': [self.tier]} + self.tiers.configure_mock(**attrs) + + self.run_tests_parser = run_tests.RunTestsParser() + + @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test') + def test_get_run_dict(self, *args): + retval = {'run': mock.Mock()} + args[0].return_value = retval + self.assertEqual(self.runner.get_run_dict('test_name'), retval['run']) + args[0].assert_called_once_with('test_name') + + @mock.patch('xtesting.ci.run_tests.LOGGER.error') + @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test', + return_value=None) + def test_get_run_dict_config_ko(self, *args): + testname = 'test_name' + self.assertEqual(self.runner.get_run_dict(testname), None) + args[0].return_value = {} + self.assertEqual(self.runner.get_run_dict(testname), None) + calls = [mock.call(testname), mock.call(testname)] + args[0].assert_has_calls(calls) + calls = [mock.call("Cannot get %s's config options", testname), + mock.call("Cannot get %s's config options", testname)] + args[1].assert_has_calls(calls) + + @mock.patch('xtesting.ci.run_tests.LOGGER.exception') + @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test', + side_effect=Exception) + def test_get_run_dict_exception(self, *args): + testname = 'test_name' + self.assertEqual(self.runner.get_run_dict(testname), None) + args[1].assert_called_once_with( + "Cannot get %s's config options", testname) + + def _test_source_envfile(self, msg, key='OS_TENANT_NAME', value='admin'): + try: + del os.environ[key] + except Exception: # pylint: disable=broad-except + pass + envfile = 'rc_file' + with mock.patch('six.moves.builtins.open', + mock.mock_open(read_data=msg)) as mock_method,\ + mock.patch('os.path.isfile', return_value=True): + mock_method.return_value.__iter__ = lambda self: iter( + self.readline, '') + self.runner.source_envfile(envfile) + mock_method.assert_called_once_with(envfile, 'r') + self.assertEqual(os.environ[key], value) + + def test_source_envfile(self): + self._test_source_envfile('OS_TENANT_NAME=admin') + self._test_source_envfile('OS_TENANT_NAME= admin') + self._test_source_envfile('OS_TENANT_NAME = admin') + self._test_source_envfile('OS_TENANT_NAME = "admin"') + self._test_source_envfile('export OS_TENANT_NAME=admin') + self._test_source_envfile('export OS_TENANT_NAME =admin') + self._test_source_envfile('export OS_TENANT_NAME = admin') + self._test_source_envfile('export OS_TENANT_NAME = "admin"') + # This test will fail as soon as rc_file is fixed + self._test_source_envfile( + 'export "\'OS_TENANT_NAME\'" = "\'admin\'"') + + def test_get_dict_by_test(self): + with mock.patch('six.moves.builtins.open', mock.mock_open()), \ + mock.patch('yaml.safe_load') as mock_yaml: + mock_obj = mock.Mock() + testcase_dict = {'case_name': 'testname', + 'criteria': 50} + attrs = {'get.return_value': [{'testcases': [testcase_dict]}]} + mock_obj.configure_mock(**attrs) + mock_yaml.return_value = mock_obj + self.assertDictEqual( + run_tests.Runner.get_dict_by_test('testname'), + testcase_dict) + + @mock.patch('xtesting.ci.run_tests.Runner.get_run_dict', + return_value=None) + def test_run_tests_import_exception(self, *args): + mock_test = mock.Mock() + kwargs = {'get_name.return_value': 'test_name', + 'needs_clean.return_value': False} + mock_test.configure_mock(**kwargs) + with self.assertRaises(Exception) as context: + self.runner.run_test(mock_test) + args[0].assert_called_with('test_name') + msg = "Cannot import the class for the test case." + self.assertTrue(msg in str(context.exception)) + + @mock.patch('importlib.import_module', name="module", + return_value=mock.Mock(test_class=mock.Mock( + side_effect=FakeModule))) + @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test') + def test_run_tests_default(self, *args): + mock_test = mock.Mock() + kwargs = {'get_name.return_value': 'test_name', + 'needs_clean.return_value': True} + mock_test.configure_mock(**kwargs) + test_run_dict = {'module': 'test_module', + 'class': 'test_class'} + with mock.patch('xtesting.ci.run_tests.Runner.get_run_dict', + return_value=test_run_dict): + self.runner.clean_flag = True + self.runner.run_test(mock_test) + args[0].assert_called_with('test_name') + args[1].assert_called_with('test_module') + self.assertEqual(self.runner.overall_result, + run_tests.Result.EX_OK) + + @mock.patch('xtesting.ci.run_tests.Runner.run_test', + return_value=TestCase.EX_OK) + def test_run_tier_default(self, *mock_methods): + self.assertEqual(self.runner.run_tier(self.tier), + run_tests.Result.EX_OK) + mock_methods[0].assert_called_with(mock.ANY) + + @mock.patch('xtesting.ci.run_tests.LOGGER.info') + def test_run_tier_missing_test(self, mock_logger_info): + self.tier.get_tests.return_value = None + self.assertEqual(self.runner.run_tier(self.tier), + run_tests.Result.EX_ERROR) + self.assertTrue(mock_logger_info.called) + + @mock.patch('xtesting.ci.run_tests.LOGGER.info') + @mock.patch('xtesting.ci.run_tests.Runner.run_tier') + @mock.patch('xtesting.ci.run_tests.Runner.summary') + def test_run_all_default(self, *mock_methods): + os.environ['CI_LOOP'] = 'test_ci_loop' + self.runner.run_all() + mock_methods[1].assert_not_called() + self.assertTrue(mock_methods[2].called) + + @mock.patch('xtesting.ci.run_tests.LOGGER.info') + @mock.patch('xtesting.ci.run_tests.Runner.summary') + def test_run_all_missing_tier(self, *mock_methods): + os.environ['CI_LOOP'] = 'loop_re_not_available' + self.runner.run_all() + self.assertTrue(mock_methods[1].called) + + @mock.patch('xtesting.ci.run_tests.Runner.source_envfile', + side_effect=Exception) + @mock.patch('xtesting.ci.run_tests.Runner.summary') + def test_main_failed(self, *mock_methods): + kwargs = {'test': 'test_name', 'noclean': True, 'report': True} + args = {'get_tier.return_value': False, + 'get_test.return_value': False} + self.runner.tiers = mock.Mock() + self.runner.tiers.configure_mock(**args) + self.assertEqual(self.runner.main(**kwargs), + run_tests.Result.EX_ERROR) + mock_methods[1].assert_called_once_with() + + @mock.patch('xtesting.ci.run_tests.Runner.source_envfile') + @mock.patch('xtesting.ci.run_tests.Runner.run_test', + return_value=TestCase.EX_OK) + @mock.patch('xtesting.ci.run_tests.Runner.summary') + def test_main_tier(self, *mock_methods): + mock_tier = mock.Mock() + test_mock = mock.Mock() + test_mock.get_name.return_value = 'test1' + args = {'get_name.return_value': 'tier_name', + 'get_tests.return_value': [test_mock]} + mock_tier.configure_mock(**args) + kwargs = {'test': 'tier_name', 'noclean': True, 'report': True} + args = {'get_tier.return_value': mock_tier, + 'get_test.return_value': None} + self.runner.tiers = mock.Mock() + self.runner.tiers.configure_mock(**args) + self.assertEqual(self.runner.main(**kwargs), + run_tests.Result.EX_OK) + mock_methods[1].assert_called() + + @mock.patch('xtesting.ci.run_tests.Runner.source_envfile') + @mock.patch('xtesting.ci.run_tests.Runner.run_test', + return_value=TestCase.EX_OK) + def test_main_test(self, *mock_methods): + kwargs = {'test': 'test_name', 'noclean': True, 'report': True} + args = {'get_tier.return_value': None, + 'get_test.return_value': 'test_name'} + self.runner.tiers = mock.Mock() + mock_methods[1].return_value = self.creds + self.runner.tiers.configure_mock(**args) + self.assertEqual(self.runner.main(**kwargs), + run_tests.Result.EX_OK) + mock_methods[0].assert_called_once_with('test_name') + + @mock.patch('xtesting.ci.run_tests.Runner.source_envfile') + @mock.patch('xtesting.ci.run_tests.Runner.run_all') + @mock.patch('xtesting.ci.run_tests.Runner.summary') + def test_main_all_tier(self, *args): + kwargs = {'get_tier.return_value': None, + 'get_test.return_value': None} + self.runner.tiers = mock.Mock() + self.runner.tiers.configure_mock(**kwargs) + self.assertEqual( + self.runner.main(test='all', noclean=True, report=True), + run_tests.Result.EX_OK) + args[0].assert_called_once_with(None) + args[1].assert_called_once_with() + args[2].assert_called_once_with() + + @mock.patch('xtesting.ci.run_tests.Runner.source_envfile') + def test_main_any_tier_test_ko(self, *args): + kwargs = {'get_tier.return_value': None, + 'get_test.return_value': None} + self.runner.tiers = mock.Mock() + self.runner.tiers.configure_mock(**kwargs) + self.assertEqual( + self.runner.main(test='any', noclean=True, report=True), + run_tests.Result.EX_ERROR) + args[0].assert_called_once_with() + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/xtesting/tests/unit/ci/test_tier_builder.py b/xtesting/tests/unit/ci/test_tier_builder.py new file mode 100644 index 00000000..22a44a58 --- /dev/null +++ b/xtesting/tests/unit/ci/test_tier_builder.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +# pylint: disable=missing-docstring + +import logging +import unittest + +import mock + +from xtesting.ci import tier_builder + + +class TierBuilderTesting(unittest.TestCase): + + def setUp(self): + self.dependency = { + 'installer': 'test_installer', 'scenario': 'test_scenario'} + self.testcase = { + 'dependencies': self.dependency, 'enabled': 'true', + 'case_name': 'test_name', 'criteria': 'test_criteria', + 'blocking': 'test_blocking', 'description': 'test_desc', + 'project_name': 'project_name'} + self.dic_tier = { + 'name': 'test_tier', 'order': 'test_order', + 'ci_loop': 'test_ci_loop', 'description': 'test_desc', + 'testcases': [self.testcase]} + self.mock_yaml = mock.Mock() + attrs = {'get.return_value': [self.dic_tier]} + self.mock_yaml.configure_mock(**attrs) + + with mock.patch('xtesting.ci.tier_builder.yaml.safe_load', + return_value=self.mock_yaml), \ + mock.patch('six.moves.builtins.open', mock.mock_open()): + self.tierbuilder = tier_builder.TierBuilder( + 'test_installer', 'test_scenario', 'testcases_file') + self.tier_obj = self.tierbuilder.tier_objects[0] + + def test_get_tiers(self): + self.assertEqual(self.tierbuilder.get_tiers(), + [self.tier_obj]) + + def test_get_tier_names(self): + self.assertEqual(self.tierbuilder.get_tier_names(), + ['test_tier']) + + def test_get_tier_present_tier(self): + self.assertEqual(self.tierbuilder.get_tier('test_tier'), + self.tier_obj) + + def test_get_tier_missing_tier(self): + self.assertEqual(self.tierbuilder.get_tier('test_tier2'), + None) + + def test_get_test_present_test(self): + self.assertEqual(self.tierbuilder.get_test('test_name'), + self.tier_obj.get_test('test_name')) + + def test_get_test_missing_test(self): + self.assertEqual(self.tierbuilder.get_test('test_name2'), + None) + + def test_get_tests_present_tier(self): + self.assertEqual(self.tierbuilder.get_tests('test_tier'), + self.tier_obj.tests_array) + + def test_get_tests_missing_tier(self): + self.assertEqual(self.tierbuilder.get_tests('test_tier2'), + None) + + def test_get_tier_name_ok(self): + self.assertEqual(self.tierbuilder.get_tier_name('test_name'), + 'test_tier') + + def test_get_tier_name_ko(self): + self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None) + + def test_str(self): + message = str(self.tierbuilder) + self.assertTrue('test_tier' in message) + self.assertTrue('test_order' in message) + self.assertTrue('test_ci_loop' in message) + self.assertTrue('test_desc' in message) + self.assertTrue('test_name' in message) + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/xtesting/tests/unit/ci/test_tier_handler.py b/xtesting/tests/unit/ci/test_tier_handler.py new file mode 100644 index 00000000..d1900103 --- /dev/null +++ b/xtesting/tests/unit/ci/test_tier_handler.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python + +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +# pylint: disable=missing-docstring + +import logging +import unittest + +import mock + +from xtesting.ci import tier_handler + + +class TierHandlerTesting(unittest.TestCase): + # pylint: disable=too-many-public-methods + + def setUp(self): + self.test = mock.Mock() + attrs = {'get_name.return_value': 'test_name'} + self.test.configure_mock(**attrs) + self.mock_depend = mock.Mock() + attrs = {'get_scenario.return_value': 'test_scenario', + 'get_installer.return_value': 'test_installer'} + self.mock_depend.configure_mock(**attrs) + self.tier = tier_handler.Tier( + 'test_tier', 'test_order', 'test_ci_loop', description='test_desc') + self.testcase = tier_handler.TestCase( + 'test_name', 'true', self.mock_depend, 'test_criteria', + True, description='test_desc', project='project_name') + self.dependency = tier_handler.Dependency( + 'test_installer', 'test_scenario') + self.testcase.str = self.testcase.__str__() + self.dependency.str = self.dependency.__str__() + self.tier.str = self.tier.__str__() + + def test_split_text(self): + test_str = 'this is for testing' + self.assertEqual(tier_handler.split_text(test_str, 10), + ['this is ', 'for ', 'testing ']) + + def test_add_test(self): + self.tier.add_test(self.test) + self.assertEqual(self.tier.tests_array, [self.test]) + + def test_get_skipped_test1(self): + self.assertEqual(self.tier.get_skipped_test(), []) + + def test_get_skipped_test2(self): + self.tier.skip_test(self.test) + self.assertEqual(self.tier.get_skipped_test(), [self.test]) + + def test_get_tests(self): + self.tier.tests_array = [self.test] + self.assertEqual(self.tier.get_tests(), [self.test]) + + def test_get_test_names(self): + self.tier.tests_array = [self.test] + self.assertEqual(self.tier.get_test_names(), ['test_name']) + + def test_get_test(self): + self.tier.tests_array = [self.test] + with mock.patch.object(self.tier, 'is_test', return_value=True): + self.assertEqual(self.tier.get_test('test_name'), self.test) + + def test_get_test_missing_test(self): + self.tier.tests_array = [self.test] + with mock.patch.object(self.tier, 'is_test', return_value=False): + self.assertEqual(self.tier.get_test('test_name'), None) + + def test_get_name(self): + self.assertEqual(self.tier.get_name(), 'test_tier') + + def test_get_order(self): + self.assertEqual(self.tier.get_order(), 'test_order') + + def test_get_ci_loop(self): + self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') + + def test_testcase_is_none_in_item(self): + self.assertEqual(tier_handler.TestCase.is_none("item"), False) + + def test_testcase_is_none_no_item(self): + self.assertEqual(tier_handler.TestCase.is_none(None), True) + + def test_testcase_is_compatible(self): + self.assertEqual( + self.testcase.is_compatible('test_installer', 'test_scenario'), + True) + + def test_testcase_is_compatible_2(self): + self.assertEqual( + self.testcase.is_compatible('missing_installer', 'test_scenario'), + False) + self.assertEqual( + self.testcase.is_compatible('test_installer', 'missing_scenario'), + False) + + @mock.patch('re.search', side_effect=TypeError) + def test_testcase_is_compatible3(self, *args): + self.assertEqual( + self.testcase.is_compatible('test_installer', 'test_scenario'), + False) + args[0].assert_called_once_with('test_installer', 'test_installer') + + def test_testcase_get_name(self): + self.assertEqual(self.tier.get_name(), 'test_tier') + + def test_testcase_is_enabled(self): + self.assertEqual(self.testcase.is_enabled(), 'true') + + def test_testcase_get_criteria(self): + self.assertEqual(self.testcase.get_criteria(), 'test_criteria') + + def test_testcase_is_blocking(self): + self.assertTrue(self.testcase.is_blocking()) + + def test_testcase_get_project(self): + self.assertEqual(self.testcase.get_project(), 'project_name') + + def test_testcase_get_order(self): + self.assertEqual(self.tier.get_order(), 'test_order') + + def test_testcase_get_ci_loop(self): + self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop') + + def test_dependency_get_installer(self): + self.assertEqual(self.dependency.get_installer(), 'test_installer') + + def test_dependency_get_scenario(self): + self.assertEqual(self.dependency.get_scenario(), 'test_scenario') + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/xtesting/tests/unit/core/__init__.py b/xtesting/tests/unit/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/xtesting/tests/unit/core/test_feature.py b/xtesting/tests/unit/core/test_feature.py new file mode 100644 index 00000000..9bbe5331 --- /dev/null +++ b/xtesting/tests/unit/core/test_feature.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +# Copyright (c) 2017 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +# pylint: disable=missing-docstring + +import logging +import unittest + +import mock + +from xtesting.core import feature +from xtesting.core import testcase + + +class FeatureTestingBase(unittest.TestCase): + + _case_name = "foo" + _project_name = "bar" + _repo = "dir_repo_bar" + _cmd = "run_bar_tests.py" + _output_file = '/home/opnfv/xtesting/results/foo.log' + feature = None + + @mock.patch('time.time', side_effect=[1, 2]) + def _test_run(self, status, mock_method=None): + self.assertEqual(self.feature.run(cmd=self._cmd), status) + if status == testcase.TestCase.EX_OK: + self.assertEqual(self.feature.result, 100) + else: + self.assertEqual(self.feature.result, 0) + mock_method.assert_has_calls([mock.call(), mock.call()]) + self.assertEqual(self.feature.start_time, 1) + self.assertEqual(self.feature.stop_time, 2) + + def test_logger_module_ko(self): + with mock.patch('six.moves.builtins.open'): + self.feature = feature.Feature( + project_name=self._project_name, case_name=self._case_name) + self.assertEqual(self.feature.logger.name, self._case_name) + + def test_logger_module(self): + with mock.patch('six.moves.builtins.open'): + self.feature = feature.Feature( + project_name=self._project_name, case_name=self._case_name, + run={'module': 'bar'}) + self.assertEqual(self.feature.logger.name, 'bar') + + +class FeatureTesting(FeatureTestingBase): + + def setUp(self): + # logging must be disabled else it calls time.time() + # what will break these unit tests. + logging.disable(logging.CRITICAL) + with mock.patch('six.moves.builtins.open'): + self.feature = feature.Feature( + project_name=self._project_name, case_name=self._case_name) + + def test_run_exc(self): + # pylint: disable=bad-continuation + with mock.patch.object( + self.feature, 'execute', + side_effect=Exception) as mock_method: + self._test_run(testcase.TestCase.EX_RUN_ERROR) + mock_method.assert_called_once_with(cmd=self._cmd) + + def test_run(self): + self._test_run(testcase.TestCase.EX_RUN_ERROR) + + +class BashFeatureTesting(FeatureTestingBase): + + def setUp(self): + # logging must be disabled else it calls time.time() + # what will break these unit tests. + logging.disable(logging.CRITICAL) + with mock.patch('six.moves.builtins.open'): + self.feature = feature.BashFeature( + project_name=self._project_name, case_name=self._case_name) + + @mock.patch('subprocess.Popen') + def test_run_no_cmd(self, mock_subproc): + self.assertEqual( + self.feature.run(), testcase.TestCase.EX_RUN_ERROR) + mock_subproc.assert_not_called() + + @mock.patch('subprocess.Popen') + def test_run_ko(self, mock_subproc): + with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: + mock_obj = mock.Mock() + attrs = {'wait.return_value': 1} + mock_obj.configure_mock(**attrs) + + mock_subproc.return_value = mock_obj + self._test_run(testcase.TestCase.EX_RUN_ERROR) + mopen.assert_called_once_with(self._output_file, "w+") + + @mock.patch('subprocess.Popen') + def test_run(self, mock_subproc): + with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen: + mock_obj = mock.Mock() + attrs = {'wait.return_value': 0} + mock_obj.configure_mock(**attrs) + + mock_subproc.return_value = mock_obj + self._test_run(testcase.TestCase.EX_OK) + mopen.assert_called_once_with(self._output_file, "w+") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/xtesting/tests/unit/core/test_robotframework.py b/xtesting/tests/unit/core/test_robotframework.py new file mode 100644 index 00000000..7131b7e2 --- /dev/null +++ b/xtesting/tests/unit/core/test_robotframework.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python + +# Copyright (c) 2017 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the classes required to fully cover robot.""" + +import errno +import logging +import os +import unittest + +import mock +from robot.errors import DataError, RobotError +from robot.result import model +from robot.utils.robottime import timestamp_to_secs + +from xtesting.core import robotframework + +__author__ = "Cedric Ollivier " + + +class ResultVisitorTesting(unittest.TestCase): + + """The class testing ResultVisitor.""" + # pylint: disable=missing-docstring + + def setUp(self): + self.visitor = robotframework.ResultVisitor() + + def test_empty(self): + self.assertFalse(self.visitor.get_data()) + + def test_ok(self): + data = {'name': 'foo', + 'parent': 'bar', + 'status': 'PASS', + 'starttime': "20161216 16:00:00.000", + 'endtime': "20161216 16:00:01.000", + 'elapsedtime': 1000, + 'text': 'Hello, World!', + 'critical': True} + test = model.TestCase( + name=data['name'], status=data['status'], message=data['text'], + starttime=data['starttime'], endtime=data['endtime']) + test.parent = mock.Mock() + config = {'name': data['parent'], + 'criticality.test_is_critical.return_value': data[ + 'critical']} + test.parent.configure_mock(**config) + self.visitor.visit_test(test) + self.assertEqual(self.visitor.get_data(), [data]) + + +class ParseResultTesting(unittest.TestCase): + + """The class testing RobotFramework.parse_results().""" + # pylint: disable=missing-docstring + + _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000', + 'endtime': '20161216 16:00:01.000'} + + def setUp(self): + self.test = robotframework.RobotFramework( + case_name='robot', project_name='xtesting') + + @mock.patch('robot.api.ExecutionResult', side_effect=DataError) + def test_raises_exc(self, mock_method): + with self.assertRaises(DataError): + self.test.parse_results() + mock_method.assert_called_once_with( + os.path.join(self.test.res_dir, 'output.xml')) + + def _test_result(self, config, result): + suite = mock.Mock() + suite.configure_mock(**config) + with mock.patch('robot.api.ExecutionResult', + return_value=mock.Mock(suite=suite)): + self.test.parse_results() + self.assertEqual(self.test.result, result) + self.assertEqual(self.test.start_time, + timestamp_to_secs(config['starttime'])) + self.assertEqual(self.test.stop_time, + timestamp_to_secs(config['endtime'])) + self.assertEqual(self.test.details, + {'description': config['name'], 'tests': []}) + + def test_null_passed(self): + self._config.update({'statistics.critical.passed': 0, + 'statistics.critical.total': 20}) + self._test_result(self._config, 0) + + def test_no_test(self): + self._config.update({'statistics.critical.passed': 20, + 'statistics.critical.total': 0}) + self._test_result(self._config, 0) + + def test_half_success(self): + self._config.update({'statistics.critical.passed': 10, + 'statistics.critical.total': 20}) + self._test_result(self._config, 50) + + def test_success(self): + self._config.update({'statistics.critical.passed': 20, + 'statistics.critical.total': 20}) + self._test_result(self._config, 100) + + +class RunTesting(unittest.TestCase): + + """The class testing RobotFramework.run().""" + # pylint: disable=missing-docstring + + suites = ["foo"] + variable = [] + variablefile = [] + + def setUp(self): + self.test = robotframework.RobotFramework( + case_name='robot', project_name='xtesting') + + def test_exc_key_error(self): + self.assertEqual(self.test.run(), self.test.EX_RUN_ERROR) + + @mock.patch('robot.run') + def _test_makedirs_exc(self, *args): + with mock.patch.object(self.test, 'parse_results') as mock_method: + self.assertEqual( + self.test.run( + suites=self.suites, variable=self.variable, + variablefile=self.variablefile), + self.test.EX_RUN_ERROR) + args[0].assert_not_called() + mock_method.asser_not_called() + + @mock.patch('os.makedirs', side_effect=Exception) + def test_makedirs_exc(self, *args): + self._test_makedirs_exc() + args[0].assert_called_once_with(self.test.res_dir) + + @mock.patch('os.makedirs', side_effect=OSError) + def test_makedirs_oserror(self, *args): + self._test_makedirs_exc() + args[0].assert_called_once_with(self.test.res_dir) + + @mock.patch('robot.run') + def _test_makedirs(self, *args): + with mock.patch.object(self.test, 'parse_results') as mock_method: + self.assertEqual( + self.test.run(suites=self.suites, variable=self.variable), + self.test.EX_OK) + args[0].assert_called_once_with( + *self.suites, log='NONE', output=self.test.xml_file, + report='NONE', stdout=mock.ANY, variable=self.variable, + variablefile=self.variablefile) + mock_method.assert_called_once_with() + + @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, '')) + def test_makedirs_oserror17(self, *args): + self._test_makedirs() + args[0].assert_called_once_with(self.test.res_dir) + + @mock.patch('os.makedirs') + def test_makedirs(self, *args): + self._test_makedirs() + args[0].assert_called_once_with(self.test.res_dir) + + @mock.patch('robot.run') + def _test_parse_results(self, status, *args): + self.assertEqual( + self.test.run( + suites=self.suites, variable=self.variable, + variablefile=self.variablefile), + status) + args[0].assert_called_once_with( + *self.suites, log='NONE', output=self.test.xml_file, + report='NONE', stdout=mock.ANY, variable=self.variable, + variablefile=self.variablefile) + + def test_parse_results_exc(self): + with mock.patch.object(self.test, 'parse_results', + side_effect=Exception) as mock_method: + self._test_parse_results(self.test.EX_RUN_ERROR) + mock_method.assert_called_once_with() + + def test_parse_results_robot_error(self): + with mock.patch.object(self.test, 'parse_results', + side_effect=RobotError('foo')) as mock_method: + self._test_parse_results(self.test.EX_RUN_ERROR) + mock_method.assert_called_once_with() + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/xtesting/tests/unit/core/test_testcase.py b/xtesting/tests/unit/core/test_testcase.py new file mode 100644 index 00000000..e2f56f8f --- /dev/null +++ b/xtesting/tests/unit/core/test_testcase.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python + +# Copyright (c) 2016 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Define the class required to fully cover testcase.""" + +from datetime import datetime +import json +import logging +import os +import unittest + +from xtesting.core import testcase + +import mock +import requests + + +__author__ = "Cedric Ollivier " + + +class TestCaseTesting(unittest.TestCase): + """The class testing TestCase.""" + + # pylint: disable=missing-docstring,too-many-public-methods + + _case_name = "base" + _project_name = "xtesting" + _published_result = "PASS" + _test_db_url = "http://testresults.opnfv.org/test/api/v1/results" + _headers = {'Content-Type': 'application/json'} + + def setUp(self): + self.test = testcase.TestCase(case_name=self._case_name, + project_name=self._project_name) + self.test.start_time = 1 + self.test.stop_time = 2 + self.test.result = 100 + self.test.details = {"Hello": "World"} + os.environ['TEST_DB_URL'] = TestCaseTesting._test_db_url + os.environ['INSTALLER_TYPE'] = "installer_type" + os.environ['DEPLOY_SCENARIO'] = "scenario" + os.environ['NODE_NAME'] = "node_name" + os.environ['BUILD_TAG'] = "foo-daily-master-bar" + + def test_run_unimplemented(self): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_RUN_ERROR) + + def _test_pushdb_missing_attribute(self): + self.assertEqual(self.test.push_to_db(), + testcase.TestCase.EX_PUSH_TO_DB_ERROR) + + def test_pushdb_no_project_name(self): + self.test.project_name = None + self._test_pushdb_missing_attribute() + + def test_pushdb_no_case_name(self): + self.test.case_name = None + self._test_pushdb_missing_attribute() + + def test_pushdb_no_start_time(self): + self.test.start_time = None + self._test_pushdb_missing_attribute() + + def test_pushdb_no_stop_time(self): + self.test.stop_time = None + self._test_pushdb_missing_attribute() + + def _test_pushdb_missing_env(self, var): + del os.environ[var] + self.assertEqual(self.test.push_to_db(), + testcase.TestCase.EX_PUSH_TO_DB_ERROR) + + def test_pushdb_no_db_url(self): + self._test_pushdb_missing_env('TEST_DB_URL') + + def test_pushdb_no_installer_type(self): + self._test_pushdb_missing_env('INSTALLER_TYPE') + + def test_pushdb_no_deploy_scenario(self): + self._test_pushdb_missing_env('DEPLOY_SCENARIO') + + def test_pushdb_no_node_name(self): + self._test_pushdb_missing_env('NODE_NAME') + + def test_pushdb_no_build_tag(self): + self._test_pushdb_missing_env('BUILD_TAG') + + @mock.patch('requests.post') + def test_pushdb_bad_start_time(self, mock_function=None): + self.test.start_time = "1" + self.assertEqual( + self.test.push_to_db(), + testcase.TestCase.EX_PUSH_TO_DB_ERROR) + mock_function.assert_not_called() + + @mock.patch('requests.post') + def test_pushdb_bad_end_time(self, mock_function=None): + self.test.stop_time = "2" + self.assertEqual( + self.test.push_to_db(), + testcase.TestCase.EX_PUSH_TO_DB_ERROR) + mock_function.assert_not_called() + + def _get_data(self): + return { + "build_tag": os.environ['BUILD_TAG'], + "case_name": self._case_name, + "criteria": 'PASS' if self.test.is_successful( + ) == self.test.EX_OK else 'FAIL', + "details": self.test.details, + "installer": os.environ['INSTALLER_TYPE'], + "pod_name": os.environ['NODE_NAME'], + "project_name": self.test.project_name, + "scenario": os.environ['DEPLOY_SCENARIO'], + "start_date": datetime.fromtimestamp( + self.test.start_time).strftime('%Y-%m-%d %H:%M:%S'), + "stop_date": datetime.fromtimestamp( + self.test.stop_time).strftime('%Y-%m-%d %H:%M:%S'), + "version": "master"} + + @mock.patch('requests.post') + def _test_pushdb_version(self, mock_function=None, **kwargs): + payload = self._get_data() + payload["version"] = kwargs.get("version", "unknown") + self.assertEqual(self.test.push_to_db(), testcase.TestCase.EX_OK) + mock_function.assert_called_once_with( + os.environ['TEST_DB_URL'], + data=json.dumps(payload, sort_keys=True), + headers=self._headers) + + def test_pushdb_daily_job(self): + self._test_pushdb_version(version="master") + + def test_pushdb_weekly_job(self): + os.environ['BUILD_TAG'] = 'foo-weekly-master-bar' + self._test_pushdb_version(version="master") + + def test_pushdb_random_build_tag(self): + os.environ['BUILD_TAG'] = 'whatever' + self._test_pushdb_version(version="unknown") + + @mock.patch('requests.post', return_value=mock.Mock( + raise_for_status=mock.Mock( + side_effect=requests.exceptions.HTTPError))) + def test_pushdb_http_errors(self, mock_function=None): + self.assertEqual( + self.test.push_to_db(), + testcase.TestCase.EX_PUSH_TO_DB_ERROR) + mock_function.assert_called_once_with( + os.environ['TEST_DB_URL'], + data=json.dumps(self._get_data(), sort_keys=True), + headers=self._headers) + + def test_check_criteria_missing(self): + self.test.criteria = None + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_check_result_missing(self): + self.test.result = None + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_check_result_failed(self): + # Backward compatibility + # It must be removed as soon as TestCase subclasses + # stop setting result = 'PASS' or 'FAIL'. + self.test.result = 'FAIL' + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_check_result_pass(self): + # Backward compatibility + # It must be removed as soon as TestCase subclasses + # stop setting result = 'PASS' or 'FAIL'. + self.test.result = 'PASS' + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_OK) + + def test_check_result_lt(self): + self.test.result = 50 + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_check_result_eq(self): + self.test.result = 100 + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_OK) + + def test_check_result_gt(self): + self.test.criteria = 50 + self.test.result = 100 + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_OK) + + def test_check_result_zero(self): + self.test.criteria = 0 + self.test.result = 0 + self.assertEqual(self.test.is_successful(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_get_duration_start_ko(self): + self.test.start_time = None + self.assertEqual(self.test.get_duration(), "XX:XX") + self.test.start_time = 0 + self.assertEqual(self.test.get_duration(), "XX:XX") + + def test_get_duration_end_ko(self): + self.test.stop_time = None + self.assertEqual(self.test.get_duration(), "XX:XX") + self.test.stop_time = 0 + self.assertEqual(self.test.get_duration(), "XX:XX") + + def test_get_invalid_duration(self): + self.test.start_time = 2 + self.test.stop_time = 1 + self.assertEqual(self.test.get_duration(), "XX:XX") + + def test_get_zero_duration(self): + self.test.start_time = 2 + self.test.stop_time = 2 + self.assertEqual(self.test.get_duration(), "00:00") + + def test_get_duration(self): + self.test.start_time = 1 + self.test.stop_time = 180 + self.assertEqual(self.test.get_duration(), "02:59") + + def test_str_project_name_ko(self): + self.test.project_name = None + self.assertIn("