aboutsummaryrefslogtreecommitdiffstats
path: root/xtesting
diff options
context:
space:
mode:
Diffstat (limited to 'xtesting')
-rw-r--r--xtesting/__init__.py0
-rw-r--r--xtesting/ci/__init__.py0
-rw-r--r--xtesting/ci/logging.ini65
-rw-r--r--xtesting/ci/run_tests.py302
-rw-r--r--xtesting/ci/testcases.yaml424
-rw-r--r--xtesting/ci/tier_builder.py106
-rw-r--r--xtesting/ci/tier_handler.py174
-rw-r--r--xtesting/core/__init__.py0
-rw-r--r--xtesting/core/feature.py133
-rw-r--r--xtesting/core/robotframework.py126
-rw-r--r--xtesting/core/testcase.py227
-rw-r--r--xtesting/core/unit.py92
-rw-r--r--xtesting/core/vnf.py205
-rw-r--r--xtesting/energy/__init__.py0
-rw-r--r--xtesting/energy/energy.py334
-rw-r--r--xtesting/tests/__init__.py0
-rw-r--r--xtesting/tests/unit/__init__.py0
-rw-r--r--xtesting/tests/unit/ci/__init__.py0
-rw-r--r--xtesting/tests/unit/ci/test_run_tests.py267
-rw-r--r--xtesting/tests/unit/ci/test_tier_builder.py93
-rw-r--r--xtesting/tests/unit/ci/test_tier_handler.py139
-rw-r--r--xtesting/tests/unit/core/__init__.py0
-rw-r--r--xtesting/tests/unit/core/test_feature.py117
-rw-r--r--xtesting/tests/unit/core/test_robotframework.py199
-rw-r--r--xtesting/tests/unit/core/test_testcase.py277
-rw-r--r--xtesting/tests/unit/core/test_unit.py98
-rw-r--r--xtesting/tests/unit/core/test_vnf.py187
-rw-r--r--xtesting/tests/unit/energy/__init__.py0
-rw-r--r--xtesting/tests/unit/energy/test_functest_energy.py371
-rw-r--r--xtesting/tests/unit/utils/__init__.py0
-rw-r--r--xtesting/tests/unit/utils/test_decorators.py125
-rw-r--r--xtesting/tests/unit/utils/test_env.py57
-rw-r--r--xtesting/utils/__init__.py0
-rw-r--r--xtesting/utils/constants.py5
-rw-r--r--xtesting/utils/decorators.py57
-rw-r--r--xtesting/utils/env.py44
36 files changed, 4224 insertions, 0 deletions
diff --git a/xtesting/__init__.py b/xtesting/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/__init__.py
diff --git a/xtesting/ci/__init__.py b/xtesting/ci/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/ci/__init__.py
diff --git a/xtesting/ci/logging.ini b/xtesting/ci/logging.ini
new file mode 100644
index 00000000..ab82073f
--- /dev/null
+++ b/xtesting/ci/logging.ini
@@ -0,0 +1,65 @@
+[loggers]
+keys=root,xtesting,api,ci,cli,core,energy,opnfv_tests,utils
+
+[handlers]
+keys=console,wconsole,file,null
+
+[formatters]
+keys=standard
+
+[logger_root]
+level=NOTSET
+handlers=null
+
+[logger_xtesting]
+level=NOTSET
+handlers=file
+qualname=xtesting
+
+[logger_ci]
+level=NOTSET
+handlers=console
+qualname=xtesting.ci
+
+[logger_core]
+level=NOTSET
+handlers=console
+qualname=xtesting.core
+
+[logger_energy]
+level=NOTSET
+handlers=wconsole
+qualname=xtesting.energy
+
+[logger_utils]
+level=NOTSET
+handlers=wconsole
+qualname=xtesting.utils
+
+[handler_null]
+class=NullHandler
+level=NOTSET
+formatter=standard
+args=()
+
+[handler_console]
+class=StreamHandler
+level=INFO
+formatter=standard
+args=(sys.stdout,)
+
+[handler_wconsole]
+class=StreamHandler
+level=WARN
+formatter=standard
+args=(sys.stdout,)
+
+[handler_file]
+class=FileHandler
+level=DEBUG
+formatter=standard
+args=("/home/opnfv/xtesting/results/xtesting.log",)
+
+[formatter_standard]
+format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
+datefmt=
diff --git a/xtesting/ci/run_tests.py b/xtesting/ci/run_tests.py
new file mode 100644
index 00000000..5c9143a3
--- /dev/null
+++ b/xtesting/ci/run_tests.py
@@ -0,0 +1,302 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Ericsson AB and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+""" The entry of running tests:
+1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
+2) Execute the common operations on every testcase (run, push results to db...)
+3) Return the right status code
+"""
+
+import argparse
+import importlib
+import logging
+import logging.config
+import os
+import re
+import sys
+import textwrap
+import pkg_resources
+
+import enum
+import prettytable
+import yaml
+
+from xtesting.ci import tier_builder
+from xtesting.core import testcase
+from xtesting.utils import constants
+from xtesting.utils import env
+
+LOGGER = logging.getLogger('xtesting.ci.run_tests')
+
+
+class Result(enum.Enum):
+ """The overall result in enumerated type"""
+ # pylint: disable=too-few-public-methods
+ EX_OK = os.EX_OK
+ EX_ERROR = -1
+
+
+class BlockingTestFailed(Exception):
+ """Exception when the blocking test fails"""
+ pass
+
+
+class TestNotEnabled(Exception):
+ """Exception when the test is not enabled"""
+ pass
+
+
+class RunTestsParser(object):
+ """Parser to run tests"""
+ # pylint: disable=too-few-public-methods
+
+ def __init__(self):
+ self.parser = argparse.ArgumentParser()
+ self.parser.add_argument("-t", "--test", dest="test", action='store',
+ help="Test case or tier (group of tests) "
+ "to be executed. It will run all the test "
+ "if not specified.")
+ self.parser.add_argument("-n", "--noclean", help="Do not clean "
+ "OpenStack resources after running each "
+ "test (default=false).",
+ action="store_true")
+ self.parser.add_argument("-r", "--report", help="Push results to "
+ "database (default=false).",
+ action="store_true")
+
+ def parse_args(self, argv=None):
+ """Parse arguments.
+
+ It can call sys.exit if arguments are incorrect.
+
+ Returns:
+ the arguments from cmdline
+ """
+ return vars(self.parser.parse_args(argv))
+
+
+class Runner(object):
+ """Runner class"""
+
+ def __init__(self):
+ self.executed_test_cases = {}
+ self.overall_result = Result.EX_OK
+ self.clean_flag = True
+ self.report_flag = False
+ self.tiers = tier_builder.TierBuilder(
+ env.get('INSTALLER_TYPE'),
+ env.get('DEPLOY_SCENARIO'),
+ pkg_resources.resource_filename('xtesting', 'ci/testcases.yaml'))
+
+ @staticmethod
+ def source_envfile(rc_file=constants.ENV_FILE):
+ """Source the env file passed as arg"""
+ if not os.path.isfile(rc_file):
+ LOGGER.debug("No env file %s found", rc_file)
+ return
+ with open(rc_file, "r") as rcfd:
+ for line in rcfd:
+ var = (line.rstrip('"\n').replace('export ', '').split(
+ "=") if re.search(r'(.*)=(.*)', line) else None)
+ # The two next lines should be modified as soon as rc_file
+ # conforms with common rules. Be aware that it could induce
+ # issues if value starts with '
+ if var:
+ key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
+ value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
+ os.environ[key] = value
+ rcfd.seek(0, 0)
+ LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
+
+ @staticmethod
+ def get_dict_by_test(testname):
+ # pylint: disable=bad-continuation,missing-docstring
+ with open(pkg_resources.resource_filename(
+ 'xtesting', 'ci/testcases.yaml')) as tyaml:
+ testcases_yaml = yaml.safe_load(tyaml)
+ for dic_tier in testcases_yaml.get("tiers"):
+ for dic_testcase in dic_tier['testcases']:
+ if dic_testcase['case_name'] == testname:
+ return dic_testcase
+ LOGGER.error('Project %s is not defined in testcases.yaml', testname)
+ return None
+
+ @staticmethod
+ def get_run_dict(testname):
+ """Obtain the 'run' block of the testcase from testcases.yaml"""
+ try:
+ dic_testcase = Runner.get_dict_by_test(testname)
+ if not dic_testcase:
+ LOGGER.error("Cannot get %s's config options", testname)
+ elif 'run' in dic_testcase:
+ return dic_testcase['run']
+ return None
+ except Exception: # pylint: disable=broad-except
+ LOGGER.exception("Cannot get %s's config options", testname)
+ return None
+
+ def run_test(self, test):
+ """Run one test case"""
+ if not test.is_enabled():
+ raise TestNotEnabled(
+ "The test case {} is not enabled".format(test.get_name()))
+ LOGGER.info("Running test case '%s'...", test.get_name())
+ result = testcase.TestCase.EX_RUN_ERROR
+ run_dict = self.get_run_dict(test.get_name())
+ if run_dict:
+ try:
+ module = importlib.import_module(run_dict['module'])
+ cls = getattr(module, run_dict['class'])
+ test_dict = Runner.get_dict_by_test(test.get_name())
+ test_case = cls(**test_dict)
+ self.executed_test_cases[test.get_name()] = test_case
+ try:
+ kwargs = run_dict['args']
+ test_case.run(**kwargs)
+ except KeyError:
+ test_case.run()
+ if self.report_flag:
+ test_case.push_to_db()
+ if test.get_project() == "xtesting":
+ result = test_case.is_successful()
+ else:
+ result = testcase.TestCase.EX_OK
+ LOGGER.info("Test result:\n\n%s\n", test_case)
+ if self.clean_flag:
+ test_case.clean()
+ except ImportError:
+ LOGGER.exception("Cannot import module %s", run_dict['module'])
+ except AttributeError:
+ LOGGER.exception("Cannot get class %s", run_dict['class'])
+ else:
+ raise Exception("Cannot import the class for the test case.")
+ return result
+
+ def run_tier(self, tier):
+ """Run one tier"""
+ tier_name = tier.get_name()
+ tests = tier.get_tests()
+ if not tests:
+ LOGGER.info("There are no supported test cases in this tier "
+ "for the given scenario")
+ self.overall_result = Result.EX_ERROR
+ else:
+ LOGGER.info("Running tier '%s'", tier_name)
+ for test in tests:
+ self.run_test(test)
+ test_case = self.executed_test_cases[test.get_name()]
+ if test_case.is_successful() != testcase.TestCase.EX_OK:
+ LOGGER.error("The test case '%s' failed.", test.get_name())
+ if test.get_project() == "xtesting":
+ self.overall_result = Result.EX_ERROR
+ if test.is_blocking():
+ raise BlockingTestFailed(
+ "The test case {} failed and is blocking".format(
+ test.get_name()))
+ return self.overall_result
+
+ def run_all(self):
+ """Run all available testcases"""
+ tiers_to_run = []
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['tiers', 'order', 'CI Loop', 'description',
+ 'testcases'])
+ for tier in self.tiers.get_tiers():
+ ci_loop = env.get('CI_LOOP')
+ if (tier.get_tests() and
+ re.search(ci_loop, tier.get_ci_loop()) is not None):
+ tiers_to_run.append(tier)
+ msg.add_row([tier.get_name(), tier.get_order(),
+ tier.get_ci_loop(),
+ textwrap.fill(tier.description, width=40),
+ textwrap.fill(' '.join([str(x.get_name(
+ )) for x in tier.get_tests()]), width=40)])
+ LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
+ for tier in tiers_to_run:
+ self.run_tier(tier)
+
+ def main(self, **kwargs):
+ """Entry point of class Runner"""
+ if 'noclean' in kwargs:
+ self.clean_flag = not kwargs['noclean']
+ if 'report' in kwargs:
+ self.report_flag = kwargs['report']
+ try:
+ LOGGER.info("Deployment description:\n\n%s\n", env.string())
+ self.source_envfile()
+ if 'test' in kwargs:
+ LOGGER.debug("Test args: %s", kwargs['test'])
+ if self.tiers.get_tier(kwargs['test']):
+ self.run_tier(self.tiers.get_tier(kwargs['test']))
+ elif self.tiers.get_test(kwargs['test']):
+ result = self.run_test(
+ self.tiers.get_test(kwargs['test']))
+ if result != testcase.TestCase.EX_OK:
+ LOGGER.error("The test case '%s' failed.",
+ kwargs['test'])
+ self.overall_result = Result.EX_ERROR
+ elif kwargs['test'] == "all":
+ self.run_all()
+ else:
+ LOGGER.error("Unknown test case or tier '%s', or not "
+ "supported by the given scenario '%s'.",
+ kwargs['test'],
+ env.get('DEPLOY_SCENARIO'))
+ LOGGER.debug("Available tiers are:\n\n%s",
+ self.tiers)
+ return Result.EX_ERROR
+ else:
+ self.run_all()
+ except BlockingTestFailed:
+ pass
+ except Exception: # pylint: disable=broad-except
+ LOGGER.exception("Failures when running testcase(s)")
+ self.overall_result = Result.EX_ERROR
+ if not self.tiers.get_test(kwargs['test']):
+ self.summary(self.tiers.get_tier(kwargs['test']))
+ LOGGER.info("Execution exit value: %s", self.overall_result)
+ return self.overall_result
+
+ def summary(self, tier=None):
+ """To generate xtesting report showing the overall results"""
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['test case', 'project', 'tier',
+ 'duration', 'result'])
+ tiers = [tier] if tier else self.tiers.get_tiers()
+ for each_tier in tiers:
+ for test in each_tier.get_tests():
+ try:
+ test_case = self.executed_test_cases[test.get_name()]
+ except KeyError:
+ msg.add_row([test.get_name(), test.get_project(),
+ each_tier.get_name(), "00:00", "SKIP"])
+ else:
+ result = 'PASS' if(test_case.is_successful(
+ ) == test_case.EX_OK) else 'FAIL'
+ msg.add_row(
+ [test_case.case_name, test_case.project_name,
+ self.tiers.get_tier_name(test_case.case_name),
+ test_case.get_duration(), result])
+ for test in each_tier.get_skipped_test():
+ msg.add_row([test.get_name(), test.get_project(),
+ each_tier.get_name(), "00:00", "SKIP"])
+ LOGGER.info("Xtesting report:\n\n%s\n", msg)
+
+
+def main():
+ """Entry point"""
+ logging.config.fileConfig(pkg_resources.resource_filename(
+ 'xtesting', 'ci/logging.ini'))
+ logging.captureWarnings(True)
+ parser = RunTestsParser()
+ args = parser.parse_args(sys.argv[1:])
+ runner = Runner()
+ return runner.main(**args).value
diff --git a/xtesting/ci/testcases.yaml b/xtesting/ci/testcases.yaml
new file mode 100644
index 00000000..c338f57e
--- /dev/null
+++ b/xtesting/ci/testcases.yaml
@@ -0,0 +1,424 @@
+---
+tiers:
+ -
+ name: healthcheck
+ order: 0
+ ci_loop: '(daily)|(weekly)'
+ description: >-
+ First tier to be executed to verify the basic
+ operations in the VIM.
+ testcases:
+ -
+ case_name: connection_check
+ project_name: xtesting
+ criteria: 100
+ blocking: true
+ description: >-
+ This test case verifies the retrieval of OpenStack clients:
+ Keystone, Glance, Neutron and Nova and may perform some
+ simple queries. When the config value of
+ snaps.use_keystone is True, xtesting must have access to
+ the cloud's private network.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module:
+ 'xtesting.opnfv_tests.openstack.snaps.connection_check'
+ class: 'ConnectionCheck'
+
+ -
+ case_name: api_check
+ project_name: xtesting
+ criteria: 100
+ blocking: true
+ description: >-
+ This test case verifies the retrieval of OpenStack clients:
+ Keystone, Glance, Neutron and Nova and may perform some
+ simple queries. When the config value of
+ snaps.use_keystone is True, xtesting must have access to
+ the cloud's private network.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: '^((?!lxd).)*$'
+ run:
+ module: 'xtesting.opnfv_tests.openstack.snaps.api_check'
+ class: 'ApiCheck'
+
+ -
+ case_name: snaps_health_check
+ project_name: xtesting
+ criteria: 100
+ blocking: true
+ description: >-
+ This test case creates executes the SimpleHealthCheck
+ Python test class which creates an, image, flavor, network,
+ and Cirros VM instance and observes the console output to
+ validate the single port obtains the correct IP address.
+ dependencies:
+ installer: ''
+ scenario: '^((?!lxd).)*$'
+ run:
+ module: 'xtesting.opnfv_tests.openstack.snaps.health_check'
+ class: 'HealthCheck'
+
+ -
+ name: smoke
+ order: 1
+ ci_loop: '(daily)|(weekly)'
+ description: >-
+ Set of basic Functional tests to validate the OPNFV scenarios.
+ testcases:
+ -
+ case_name: vping_ssh
+ project_name: xtesting
+ criteria: 100
+ blocking: true
+ description: >-
+ This test case verifies: 1) SSH to an instance using
+ floating IPs over the public network. 2) Connectivity
+ between 2 instances over a private network.
+ dependencies:
+ installer: ''
+ scenario: '^((?!odl_l3|odl-bgpvpn|gluon).)*$'
+ run:
+ module: 'xtesting.opnfv_tests.openstack.vping.vping_ssh'
+ class: 'VPingSSH'
+
+ -
+ case_name: vping_userdata
+ project_name: xtesting
+ criteria: 100
+ blocking: true
+ description: >-
+ This test case verifies: 1) Boot a VM with given userdata.
+ 2) Connectivity between 2 instances over a private network.
+ dependencies:
+ installer: ''
+ scenario: '^((?!lxd).)*$'
+ run:
+ module:
+ 'xtesting.opnfv_tests.openstack.vping.vping_userdata'
+ class: 'VPingUserdata'
+
+ -
+ case_name: tempest_smoke_serial
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case runs the smoke subset of the OpenStack
+ Tempest suite. The list of test cases is generated by
+ Tempest automatically and depends on the parameters of
+ the OpenStack deplopyment.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module: 'xtesting.opnfv_tests.openstack.tempest.tempest'
+ class: 'TempestSmokeSerial'
+
+ -
+ case_name: rally_sanity
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case runs a sub group of tests of the OpenStack
+ Rally suite in smoke mode.
+ dependencies:
+ installer: ''
+ scenario: ''
+ run:
+ module: 'xtesting.opnfv_tests.openstack.rally.rally'
+ class: 'RallySanity'
+
+ -
+ case_name: refstack_defcore
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case runs a sub group of tests of the OpenStack
+ Defcore testcases by using refstack client.
+ dependencies:
+ installer: ''
+ scenario: ''
+ run:
+ module:
+ 'xtesting.opnfv_tests.openstack.refstack_client.refstack_client'
+ class: 'RefstackClient'
+
+ -
+ case_name: odl
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ Test Suite for the OpenDaylight SDN Controller. It
+ integrates some test suites from upstream using
+ Robot as the test framework.
+ dependencies:
+ installer: ''
+ scenario: 'odl'
+ run:
+ module: 'xtesting.opnfv_tests.sdn.odl.odl'
+ class: 'ODLTests'
+ args:
+ suites:
+ - /src/odl_test/csit/suites/integration/basic
+ - /src/odl_test/csit/suites/openstack/neutron
+
+ -
+ case_name: odl_netvirt
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ Test Suite for the OpenDaylight SDN Controller when
+ the NetVirt features are installed. It integrates
+ some test suites from upstream using Robot as the
+ test framework.
+ dependencies:
+ installer: 'apex'
+ scenario: 'os-odl_l3-nofeature'
+ run:
+ module: 'xtesting.opnfv_tests.sdn.odl.odl'
+ class: 'ODLTests'
+ args:
+ suites:
+ - /src/odl_test/csit/suites/integration/basic
+ - /src/odl_test/csit/suites/openstack/neutron
+ - /src/odl_test/csit/suites/openstack/connectivity
+
+ -
+ case_name: snaps_smoke
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case contains tests that setup and destroy
+ environments with VMs with and without Floating IPs
+ with a newly created user and project. Set the config
+ value snaps.use_floating_ips (True|False) to toggle
+ this functionality. When the config value of
+ snaps.use_keystone is True, xtesting must have access to
+ the cloud's private network.
+
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: '^((?!lxd).)*$'
+ run:
+ module: 'xtesting.opnfv_tests.openstack.snaps.smoke'
+ class: 'SnapsSmoke'
+
+ -
+ name: features
+ order: 2
+ ci_loop: '(daily)|(weekly)'
+ description: >-
+ Test suites from feature projects
+ integrated in xtesting
+ testcases:
+ -
+ case_name: doctor-notification
+ project_name: doctor
+ criteria: 100
+ blocking: false
+ description: >-
+ Test suite from Doctor project.
+ dependencies:
+ installer: 'apex'
+ scenario: '^((?!fdio).)*$'
+ run:
+ module: 'xtesting.core.feature'
+ class: 'BashFeature'
+ args:
+ cmd: 'doctor-test'
+
+ -
+ case_name: bgpvpn
+ project_name: sdnvpn
+ criteria: 100
+ blocking: false
+ description: >-
+ Test suite from SDNVPN project.
+ dependencies:
+ installer: '(fuel)|(apex)|(netvirt)'
+ scenario: 'bgpvpn'
+ run:
+ module: 'sdnvpn.test.xtesting.run_sdnvpn_tests'
+ class: 'SdnvpnFunctest'
+
+ -
+ case_name: xtesting-odl-sfc
+ project_name: sfc
+ criteria: 100
+ blocking: false
+ description: >-
+ Test suite for odl-sfc to test two chains with one SF and
+ one chain with two SFs
+ dependencies:
+ installer: ''
+ scenario: 'odl.*sfc'
+ run:
+ module: 'xtesting.core.feature'
+ class: 'BashFeature'
+ args:
+ cmd: 'run_sfc_tests.py'
+
+ -
+ case_name: barometercollectd
+ project_name: barometer
+ criteria: 100
+ blocking: false
+ description: >-
+ Test suite for the Barometer project. Separate tests verify
+ the proper configuration and basic functionality of all the
+ collectd plugins as described in the Project Release Plan
+ dependencies:
+ installer: 'apex'
+ scenario: 'bar'
+ run:
+ module: 'baro_tests.barometer'
+ class: 'BarometerCollectd'
+
+ -
+ case_name: fds
+ project_name: fastdatastacks
+ criteria: 100
+ blocking: false
+ description: >-
+ Test Suite for the OpenDaylight SDN Controller when GBP
+ features are installed. It integrates some test suites from
+ upstream using Robot as the test framework.
+ dependencies:
+ installer: 'apex'
+ scenario: 'odl.*-fdio'
+ run:
+ module: 'xtesting.opnfv_tests.sdn.odl.odl'
+ class: 'ODLTests'
+ args:
+ suites:
+ - /src/fds/testing/robot
+
+ -
+ name: components
+ order: 3
+ ci_loop: 'weekly'
+ description: >-
+ Extensive testing of OpenStack API.
+ testcases:
+ -
+ case_name: tempest_full_parallel
+ project_name: xtesting
+ criteria: 80
+ blocking: false
+ description: >-
+ The list of test cases is generated by
+ Tempest automatically and depends on the parameters of
+ the OpenStack deplopyment.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module: 'xtesting.opnfv_tests.openstack.tempest.tempest'
+ class: 'TempestFullParallel'
+
+ -
+ case_name: rally_full
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case runs the full suite of scenarios of the
+ OpenStack Rally suite using several threads and iterations.
+ dependencies:
+ installer: '^((?!netvirt).)*$'
+ scenario: ''
+ run:
+ module: 'xtesting.opnfv_tests.openstack.rally.rally'
+ class: 'RallyFull'
+
+ -
+ name: vnf
+ order: 4
+ ci_loop: '(daily)|(weekly)'
+ description: >-
+ Collection of VNF test cases.
+ testcases:
+ -
+ case_name: cloudify_ims
+ project_name: xtesting
+ criteria: 80
+ blocking: false
+ description: >-
+ This test case deploys an OpenSource vIMS solution from
+ Clearwater using the Cloudify orchestrator. It also runs
+ some signaling traffic.
+ dependencies:
+ installer: ''
+ scenario: 'os-nosdn-nofeature-.*ha'
+ run:
+ module: 'xtesting.opnfv_tests.vnf.ims.cloudify_ims'
+ class: 'CloudifyIms'
+
+ -
+ case_name: vyos_vrouter
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ This test case is vRouter testing.
+ dependencies:
+ installer: ''
+ scenario: 'os-nosdn-nofeature-.*ha'
+ run:
+ module: 'xtesting.opnfv_tests.vnf.router.cloudify_vrouter'
+ class: 'CloudifyVrouter'
+
+ -
+ case_name: orchestra_openims
+ project_name: orchestra
+ enabled: false
+ criteria: 100
+ blocking: false
+ description: >-
+ OpenIMS VNF deployment with Open Baton (Orchestra)
+ dependencies:
+ installer: ''
+ scenario: 'os-nosdn-nofeature-.*ha'
+ run:
+ module: 'xtesting.opnfv_tests.vnf.ims.orchestra_openims'
+ class: 'OpenImsVnf'
+
+ -
+ case_name: orchestra_clearwaterims
+ project_name: orchestra
+ enabled: false
+ criteria: 100
+ blocking: false
+ description: >-
+ ClearwaterIMS VNF deployment with Open Baton (Orchestra)
+ dependencies:
+ installer: ''
+ scenario: 'os-nosdn-nofeature-.*ha'
+ run:
+ module:
+ 'xtesting.opnfv_tests.vnf.ims.orchestra_clearwaterims'
+ class: 'ClearwaterImsVnf'
+
+ -
+ case_name: juju_epc
+ project_name: xtesting
+ criteria: 100
+ blocking: false
+ description: >-
+ vEPC validation with Juju as VNF manager and ABoT as test
+ executor.
+ dependencies:
+ installer: ''
+ scenario: 'os-nosdn-nofeature-.*ha'
+ run:
+ module: 'xtesting.opnfv_tests.vnf.epc.juju_epc'
+ class: 'JujuEpc'
diff --git a/xtesting/ci/tier_builder.py b/xtesting/ci/tier_builder.py
new file mode 100644
index 00000000..2c7b0cab
--- /dev/null
+++ b/xtesting/ci/tier_builder.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Ericsson AB and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""TierBuilder class to parse testcases config file"""
+
+import yaml
+
+import xtesting.ci.tier_handler as th
+
+
+class TierBuilder(object):
+ # pylint: disable=missing-docstring
+
+ def __init__(self, ci_installer, ci_scenario, testcases_file):
+ self.ci_installer = ci_installer
+ self.ci_scenario = ci_scenario
+ self.testcases_file = testcases_file
+ self.dic_tier_array = None
+ self.tier_objects = []
+ self.testcases_yaml = None
+ self.generate_tiers()
+
+ def read_test_yaml(self):
+ with open(self.testcases_file) as tc_file:
+ self.testcases_yaml = yaml.safe_load(tc_file)
+
+ self.dic_tier_array = []
+ for tier in self.testcases_yaml.get("tiers"):
+ self.dic_tier_array.append(tier)
+
+ def generate_tiers(self):
+ if self.dic_tier_array is None:
+ self.read_test_yaml()
+
+ del self.tier_objects[:]
+ for dic_tier in self.dic_tier_array:
+ tier = th.Tier(
+ name=dic_tier['name'], order=dic_tier['order'],
+ ci_loop=dic_tier['ci_loop'],
+ description=dic_tier['description'])
+
+ for dic_testcase in dic_tier['testcases']:
+ installer = dic_testcase['dependencies']['installer']
+ scenario = dic_testcase['dependencies']['scenario']
+ dep = th.Dependency(installer, scenario)
+
+ testcase = th.TestCase(
+ name=dic_testcase['case_name'],
+ enabled=dic_testcase.get('enabled', True),
+ dependency=dep, criteria=dic_testcase['criteria'],
+ blocking=dic_testcase['blocking'],
+ description=dic_testcase['description'],
+ project=dic_testcase['project_name'])
+ if (testcase.is_compatible(self.ci_installer,
+ self.ci_scenario) and
+ testcase.is_enabled()):
+ tier.add_test(testcase)
+ else:
+ tier.skip_test(testcase)
+
+ self.tier_objects.append(tier)
+
+ def get_tiers(self):
+ return self.tier_objects
+
+ def get_tier_names(self):
+ tier_names = []
+ for tier in self.tier_objects:
+ tier_names.append(tier.get_name())
+ return tier_names
+
+ def get_tier(self, tier_name):
+ for i in range(0, len(self.tier_objects)):
+ if self.tier_objects[i].get_name() == tier_name:
+ return self.tier_objects[i]
+ return None
+
+ def get_tier_name(self, test_name):
+ for i in range(0, len(self.tier_objects)):
+ if self.tier_objects[i].is_test(test_name):
+ return self.tier_objects[i].name
+ return None
+
+ def get_test(self, test_name):
+ for i in range(0, len(self.tier_objects)):
+ if self.tier_objects[i].is_test(test_name):
+ return self.tier_objects[i].get_test(test_name)
+ return None
+
+ def get_tests(self, tier_name):
+ for i in range(0, len(self.tier_objects)):
+ if self.tier_objects[i].get_name() == tier_name:
+ return self.tier_objects[i].get_tests()
+ return None
+
+ def __str__(self):
+ output = ""
+ for i in range(0, len(self.tier_objects)):
+ output += str(self.tier_objects[i]) + "\n"
+ return output
diff --git a/xtesting/ci/tier_handler.py b/xtesting/ci/tier_handler.py
new file mode 100644
index 00000000..9fc3f24d
--- /dev/null
+++ b/xtesting/ci/tier_handler.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Ericsson AB and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Tier and TestCase classes to wrap the testcases config file"""
+# pylint: disable=missing-docstring
+
+import re
+import textwrap
+
+import prettytable
+
+
+LINE_LENGTH = 72
+
+
+def split_text(text, max_len):
+ words = text.split()
+ lines = []
+ line = ""
+ for word in words:
+ if len(line) + len(word) < max_len - 1:
+ line += word + " "
+ else:
+ lines.append(line)
+ line = word + " "
+ if line != "":
+ lines.append(line)
+ return lines
+
+
+class Tier(object):
+
+ def __init__(self, name, order, ci_loop, description=""):
+ self.tests_array = []
+ self.skipped_tests_array = []
+ self.name = name
+ self.order = order
+ self.ci_loop = ci_loop
+ self.description = description
+
+ def add_test(self, testcase):
+ self.tests_array.append(testcase)
+
+ def skip_test(self, testcase):
+ self.skipped_tests_array.append(testcase)
+
+ def get_tests(self):
+ array_tests = []
+ for test in self.tests_array:
+ array_tests.append(test)
+ return array_tests
+
+ def get_skipped_test(self):
+ return self.skipped_tests_array
+
+ def get_test_names(self):
+ array_tests = []
+ for test in self.tests_array:
+ array_tests.append(test.get_name())
+ return array_tests
+
+ def get_test(self, test_name):
+ if self.is_test(test_name):
+ for test in self.tests_array:
+ if test.get_name() == test_name:
+ return test
+ return None
+
+ def is_test(self, test_name):
+ for test in self.tests_array:
+ if test.get_name() == test_name:
+ return True
+ return False
+
+ def get_name(self):
+ return self.name
+
+ def get_order(self):
+ return self.order
+
+ def get_ci_loop(self):
+ return self.ci_loop
+
+ def __str__(self):
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['tiers', 'order', 'CI Loop', 'description',
+ 'testcases'])
+ msg.add_row(
+ [self.name, self.order, self.ci_loop,
+ textwrap.fill(self.description, width=40),
+ textwrap.fill(' '.join([str(x.get_name(
+ )) for x in self.get_tests()]), width=40)])
+ return msg.get_string()
+
+
+class TestCase(object):
+
+ def __init__(self, name, enabled, dependency, criteria, blocking,
+ description="", project=""):
+ # pylint: disable=too-many-arguments
+ self.name = name
+ self.enabled = enabled
+ self.dependency = dependency
+ self.criteria = criteria
+ self.blocking = blocking
+ self.description = description
+ self.project = project
+
+ @staticmethod
+ def is_none(item):
+ return item is None or item == ""
+
+ def is_compatible(self, ci_installer, ci_scenario):
+ try:
+ if not self.is_none(ci_installer):
+ if re.search(self.dependency.get_installer(),
+ ci_installer) is None:
+ return False
+ if not self.is_none(ci_scenario):
+ if re.search(self.dependency.get_scenario(),
+ ci_scenario) is None:
+ return False
+ return True
+ except TypeError:
+ return False
+
+ def get_name(self):
+ return self.name
+
+ def is_enabled(self):
+ return self.enabled
+
+ def get_criteria(self):
+ return self.criteria
+
+ def is_blocking(self):
+ return self.blocking
+
+ def get_project(self):
+ return self.project
+
+ def __str__(self):
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['test case', 'description', 'criteria', 'dependency'])
+ msg.add_row([self.name, textwrap.fill(self.description, width=40),
+ self.criteria, self.dependency])
+ return msg.get_string()
+
+
+class Dependency(object):
+
+ def __init__(self, installer, scenario):
+ self.installer = installer
+ self.scenario = scenario
+
+ def get_installer(self):
+ return self.installer
+
+ def get_scenario(self):
+ return self.scenario
+
+ def __str__(self):
+ delimitator = "\n" if self.get_installer(
+ ) and self.get_scenario() else ""
+ return "{}{}{}".format(self.get_installer(), delimitator,
+ self.get_scenario())
diff --git a/xtesting/core/__init__.py b/xtesting/core/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/core/__init__.py
diff --git a/xtesting/core/feature.py b/xtesting/core/feature.py
new file mode 100644
index 00000000..d3f86c02
--- /dev/null
+++ b/xtesting/core/feature.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 ZTE Corp and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the parent classes of all Xtesting Features.
+
+Feature is considered as TestCase offered by Third-party. It offers
+helpers to run any python method or any bash command.
+"""
+
+import logging
+import subprocess
+import time
+
+from xtesting.core import testcase
+
+__author__ = ("Serena Feng <feng.xiaowei@zte.com.cn>, "
+ "Cedric Ollivier <cedric.ollivier@orange.com>")
+
+
+class Feature(testcase.TestCase):
+ """Base model for single feature."""
+
+ __logger = logging.getLogger(__name__)
+ dir_results = "/home/opnfv/xtesting/results"
+
+ def __init__(self, **kwargs):
+ super(Feature, self).__init__(**kwargs)
+ self.result_file = "{}/{}.log".format(self.dir_results, self.case_name)
+ try:
+ module = kwargs['run']['module']
+ self.logger = logging.getLogger(module)
+ except KeyError:
+ self.__logger.warning(
+ "Cannot get module name %s. Using %s as fallback",
+ kwargs, self.case_name)
+ self.logger = logging.getLogger(self.case_name)
+ handler = logging.StreamHandler()
+ handler.setLevel(logging.WARN)
+ self.logger.addHandler(handler)
+ handler = logging.FileHandler(self.result_file)
+ handler.setLevel(logging.DEBUG)
+ self.logger.addHandler(handler)
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(formatter)
+ self.logger.addHandler(handler)
+
+ def execute(self, **kwargs):
+ """Execute the Python method.
+
+ The subclasses must override the default implementation which
+ is false on purpose.
+
+ The new implementation must return 0 if success or anything
+ else if failure.
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ -1.
+ """
+ # pylint: disable=unused-argument,no-self-use
+ return -1
+
+ def run(self, **kwargs):
+ """Run the feature.
+
+ It allows executing any Python method by calling execute().
+
+ It sets the following attributes required to push the results
+ to DB:
+
+ * result,
+ * start_time,
+ * stop_time.
+
+ It doesn't fulfill details when pushing the results to the DB.
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ TestCase.EX_OK if execute() returns 0,
+ TestCase.EX_RUN_ERROR otherwise.
+ """
+ self.start_time = time.time()
+ exit_code = testcase.TestCase.EX_RUN_ERROR
+ self.result = 0
+ try:
+ if self.execute(**kwargs) == 0:
+ exit_code = testcase.TestCase.EX_OK
+ self.result = 100
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("%s FAILED", self.project_name)
+ self.__logger.info("Test result is stored in '%s'", self.result_file)
+ self.stop_time = time.time()
+ return exit_code
+
+
+class BashFeature(Feature):
+ """Class designed to run any bash command."""
+
+ __logger = logging.getLogger(__name__)
+
+ def execute(self, **kwargs):
+ """Execute the cmd passed as arg
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ 0 if cmd returns 0,
+ -1 otherwise.
+ """
+ ret = -1
+ try:
+ cmd = kwargs["cmd"]
+ with open(self.result_file, 'w+') as f_stdout:
+ proc = subprocess.Popen(cmd.split(), stdout=f_stdout,
+ stderr=subprocess.STDOUT)
+ ret = proc.wait()
+ if ret != 0:
+ self.__logger.error("Execute command: %s failed", cmd)
+ except KeyError:
+ self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs)
+ return ret
diff --git a/xtesting/core/robotframework.py b/xtesting/core/robotframework.py
new file mode 100644
index 00000000..4d3746aa
--- /dev/null
+++ b/xtesting/core/robotframework.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define classes required to run any Robot suites."""
+
+from __future__ import division
+
+import errno
+import logging
+import os
+
+import robot.api
+from robot.errors import RobotError
+import robot.run
+from robot.utils.robottime import timestamp_to_secs
+from six import StringIO
+
+from xtesting.core import testcase
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+
+class ResultVisitor(robot.api.ResultVisitor):
+ """Visitor to get result details."""
+
+ def __init__(self):
+ self._data = []
+
+ def visit_test(self, test):
+ output = {}
+ output['name'] = test.name
+ output['parent'] = test.parent.name
+ output['status'] = test.status
+ output['starttime'] = test.starttime
+ output['endtime'] = test.endtime
+ output['critical'] = test.critical
+ output['text'] = test.message
+ output['elapsedtime'] = test.elapsedtime
+ self._data.append(output)
+
+ def get_data(self):
+ """Get the details of the result."""
+ return self._data
+
+
+class RobotFramework(testcase.TestCase):
+ """RobotFramework runner."""
+
+ __logger = logging.getLogger(__name__)
+ dir_results = "/home/opnfv/xtesting/results"
+
+ def __init__(self, **kwargs):
+ self.res_dir = os.path.join(self.dir_results, 'robot')
+ self.xml_file = os.path.join(self.res_dir, 'output.xml')
+ super(RobotFramework, self).__init__(**kwargs)
+
+ def parse_results(self):
+ """Parse output.xml and get the details in it."""
+ result = robot.api.ExecutionResult(self.xml_file)
+ visitor = ResultVisitor()
+ result.visit(visitor)
+ try:
+ self.result = 100 * (
+ result.suite.statistics.critical.passed /
+ result.suite.statistics.critical.total)
+ except ZeroDivisionError:
+ self.__logger.error("No test has been run")
+ self.start_time = timestamp_to_secs(result.suite.starttime)
+ self.stop_time = timestamp_to_secs(result.suite.endtime)
+ self.details = {}
+ self.details['description'] = result.suite.name
+ self.details['tests'] = visitor.get_data()
+
+ def run(self, **kwargs):
+ """Run the RobotFramework suites
+
+ Here are the steps:
+ * create the output directories if required,
+ * get the results in output.xml,
+ * delete temporary files.
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ EX_OK if all suites ran well.
+ EX_RUN_ERROR otherwise.
+ """
+ try:
+ suites = kwargs["suites"]
+ variable = kwargs.get("variable", [])
+ variablefile = kwargs.get("variablefile", [])
+ except KeyError:
+ self.__logger.exception("Mandatory args were not passed")
+ return self.EX_RUN_ERROR
+ try:
+ os.makedirs(self.res_dir)
+ except OSError as ex:
+ if ex.errno != errno.EEXIST:
+ self.__logger.exception("Cannot create %s", self.res_dir)
+ return self.EX_RUN_ERROR
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot create %s", self.res_dir)
+ return self.EX_RUN_ERROR
+ stream = StringIO()
+ robot.run(*suites, variable=variable, variablefile=variablefile,
+ output=self.xml_file, log='NONE',
+ report='NONE', stdout=stream)
+ self.__logger.info("\n" + stream.getvalue())
+ self.__logger.info("Results were successfully generated")
+ try:
+ self.parse_results()
+ self.__logger.info("Results were successfully parsed")
+ except RobotError as ex:
+ self.__logger.error("Run suites before publishing: %s", ex.message)
+ return self.EX_RUN_ERROR
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot parse results")
+ return self.EX_RUN_ERROR
+ return self.EX_OK
diff --git a/xtesting/core/testcase.py b/xtesting/core/testcase.py
new file mode 100644
index 00000000..4effa932
--- /dev/null
+++ b/xtesting/core/testcase.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the parent class of all Xtesting TestCases."""
+
+from datetime import datetime
+import json
+import logging
+import os
+import re
+import requests
+
+from xtesting.utils import decorators
+from xtesting.utils import env
+
+
+import prettytable
+
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+
+class TestCase(object):
+ """Base model for single test case."""
+
+ EX_OK = os.EX_OK
+ """everything is OK"""
+
+ EX_RUN_ERROR = os.EX_SOFTWARE
+ """run() failed"""
+
+ EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1
+ """push_to_db() failed"""
+
+ EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2
+ """results are false"""
+
+ _job_name_rule = "(dai|week)ly-(.+?)-[0-9]*"
+ _headers = {'Content-Type': 'application/json'}
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ self.details = {}
+ self.project_name = kwargs.get('project_name', 'xtesting')
+ self.case_name = kwargs.get('case_name', '')
+ self.criteria = kwargs.get('criteria', 100)
+ self.result = 0
+ self.start_time = 0
+ self.stop_time = 0
+
+ def __str__(self):
+ try:
+ assert self.project_name
+ assert self.case_name
+ result = 'PASS' if(self.is_successful(
+ ) == TestCase.EX_OK) else 'FAIL'
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['test case', 'project', 'duration',
+ 'result'])
+ msg.add_row([self.case_name, self.project_name,
+ self.get_duration(), result])
+ return msg.get_string()
+ except AssertionError:
+ self.__logger.error("We cannot print invalid objects")
+ return super(TestCase, self).__str__()
+
+ def get_duration(self):
+ """Return the duration of the test case.
+
+ Returns:
+ duration if start_time and stop_time are set
+ "XX:XX" otherwise.
+ """
+ try:
+ assert self.start_time
+ assert self.stop_time
+ if self.stop_time < self.start_time:
+ return "XX:XX"
+ return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod(
+ self.stop_time - self.start_time, 60))
+ except Exception: # pylint: disable=broad-except
+ self.__logger.error("Please run test before getting the duration")
+ return "XX:XX"
+
+ def is_successful(self):
+ """Interpret the result of the test case.
+
+ It allows getting the result of TestCase. It completes run()
+ which only returns the execution status.
+
+ It can be overriden if checking result is not suitable.
+
+ Returns:
+ TestCase.EX_OK if result is 'PASS'.
+ TestCase.EX_TESTCASE_FAILED otherwise.
+ """
+ try:
+ assert self.criteria
+ assert self.result is not None
+ if (not isinstance(self.result, str) and
+ not isinstance(self.criteria, str)):
+ if self.result >= self.criteria:
+ return TestCase.EX_OK
+ else:
+ # Backward compatibility
+ # It must be removed as soon as TestCase subclasses
+ # stop setting result = 'PASS' or 'FAIL'.
+ # In this case criteria is unread.
+ self.__logger.warning(
+ "Please update result which must be an int!")
+ if self.result == 'PASS':
+ return TestCase.EX_OK
+ except AssertionError:
+ self.__logger.error("Please run test before checking the results")
+ return TestCase.EX_TESTCASE_FAILED
+
+ def run(self, **kwargs):
+ """Run the test case.
+
+ It allows running TestCase and getting its execution
+ status.
+
+ The subclasses must override the default implementation which
+ is false on purpose.
+
+ The new implementation must set the following attributes to
+ push the results to DB:
+
+ * result,
+ * start_time,
+ * stop_time.
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ TestCase.EX_RUN_ERROR.
+ """
+ # pylint: disable=unused-argument
+ self.__logger.error("Run must be implemented")
+ return TestCase.EX_RUN_ERROR
+
+ @decorators.can_dump_request_to_file
+ def push_to_db(self):
+ """Push the results of the test case to the DB.
+
+ It allows publishing the results and checking the status.
+
+ It could be overriden if the common implementation is not
+ suitable.
+
+ The following attributes must be set before pushing the results to DB:
+
+ * project_name,
+ * case_name,
+ * result,
+ * start_time,
+ * stop_time.
+
+ The next vars must be set in env:
+
+ * TEST_DB_URL,
+ * INSTALLER_TYPE,
+ * DEPLOY_SCENARIO,
+ * NODE_NAME,
+ * BUILD_TAG.
+
+ Returns:
+ TestCase.EX_OK if results were pushed to DB.
+ TestCase.EX_PUSH_TO_DB_ERROR otherwise.
+ """
+ try:
+ assert self.project_name
+ assert self.case_name
+ assert self.start_time
+ assert self.stop_time
+ url = env.get('TEST_DB_URL')
+ data = {"project_name": self.project_name,
+ "case_name": self.case_name,
+ "details": self.details}
+ data["installer"] = env.get('INSTALLER_TYPE')
+ data["scenario"] = env.get('DEPLOY_SCENARIO')
+ data["pod_name"] = env.get('NODE_NAME')
+ data["build_tag"] = env.get('BUILD_TAG')
+ data["criteria"] = 'PASS' if self.is_successful(
+ ) == TestCase.EX_OK else 'FAIL'
+ data["start_date"] = datetime.fromtimestamp(
+ self.start_time).strftime('%Y-%m-%d %H:%M:%S')
+ data["stop_date"] = datetime.fromtimestamp(
+ self.stop_time).strftime('%Y-%m-%d %H:%M:%S')
+ try:
+ data["version"] = re.search(
+ TestCase._job_name_rule,
+ env.get('BUILD_TAG')).group(2)
+ except Exception: # pylint: disable=broad-except
+ data["version"] = "unknown"
+ req = requests.post(
+ url, data=json.dumps(data, sort_keys=True),
+ headers=self._headers)
+ req.raise_for_status()
+ self.__logger.info(
+ "The results were successfully pushed to DB %s", url)
+ except AssertionError:
+ self.__logger.exception(
+ "Please run test before publishing the results")
+ return TestCase.EX_PUSH_TO_DB_ERROR
+ except requests.exceptions.HTTPError:
+ self.__logger.exception("The HTTP request raises issues")
+ return TestCase.EX_PUSH_TO_DB_ERROR
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("The results cannot be pushed to DB")
+ return TestCase.EX_PUSH_TO_DB_ERROR
+ return TestCase.EX_OK
+
+ def clean(self):
+ """Clean the resources.
+
+ It can be overriden if resources must be deleted after
+ running the test case.
+ """
diff --git a/xtesting/core/unit.py b/xtesting/core/unit.py
new file mode 100644
index 00000000..27773679
--- /dev/null
+++ b/xtesting/core/unit.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Cable Television Laboratories, Inc. and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the parent class to run unittest.TestSuite as TestCase."""
+
+from __future__ import division
+
+import logging
+import time
+import unittest
+
+import six
+
+from xtesting.core import testcase
+
+__author__ = ("Steven Pisarski <s.pisarski@cablelabs.com>, "
+ "Cedric Ollivier <cedric.ollivier@orange.com>")
+
+
+class Suite(testcase.TestCase):
+ """Base model for running unittest.TestSuite."""
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ super(Suite, self).__init__(**kwargs)
+ self.suite = None
+
+ def run(self, **kwargs):
+ """Run the test suite.
+
+ It allows running any unittest.TestSuite and getting its
+ execution status.
+
+ By default, it runs the suite defined as instance attribute.
+ It can be overriden by passing name as arg. It must
+ conform with TestLoader.loadTestsFromName().
+
+ It sets the following attributes required to push the results
+ to DB:
+
+ * result,
+ * start_time,
+ * stop_time,
+ * details.
+
+ Args:
+ kwargs: Arbitrary keyword arguments.
+
+ Returns:
+ TestCase.EX_OK if any TestSuite has been run,
+ TestCase.EX_RUN_ERROR otherwise.
+ """
+ try:
+ name = kwargs["name"]
+ try:
+ self.suite = unittest.TestLoader().loadTestsFromName(name)
+ except ImportError:
+ self.__logger.error("Can not import %s", name)
+ return testcase.TestCase.EX_RUN_ERROR
+ except KeyError:
+ pass
+ try:
+ assert self.suite
+ self.start_time = time.time()
+ stream = six.StringIO()
+ result = unittest.TextTestRunner(
+ stream=stream, verbosity=2).run(self.suite)
+ self.__logger.debug("\n\n%s", stream.getvalue())
+ self.stop_time = time.time()
+ self.details = {
+ "testsRun": result.testsRun,
+ "failures": len(result.failures),
+ "errors": len(result.errors),
+ "stream": stream.getvalue()}
+ self.result = 100 * (
+ (result.testsRun - (len(result.failures) +
+ len(result.errors))) /
+ result.testsRun)
+ return testcase.TestCase.EX_OK
+ except AssertionError:
+ self.__logger.error("No suite is defined")
+ return testcase.TestCase.EX_RUN_ERROR
+ except ZeroDivisionError:
+ self.__logger.error("No test has been run")
+ return testcase.TestCase.EX_RUN_ERROR
diff --git a/xtesting/core/vnf.py b/xtesting/core/vnf.py
new file mode 100644
index 00000000..95ebde04
--- /dev/null
+++ b/xtesting/core/vnf.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the parent class of all VNF TestCases."""
+
+import logging
+import time
+import uuid
+
+from snaps.config.user import UserConfig
+from snaps.config.project import ProjectConfig
+from snaps.openstack.create_user import OpenStackUser
+from snaps.openstack.create_project import OpenStackProject
+from snaps.openstack.tests import openstack_tests
+
+from xtesting.core import testcase
+from xtesting.utils import constants
+
+__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
+ "Valentin Boucher <valentin.boucher@orange.com>")
+
+
+class VnfPreparationException(Exception):
+ """Raise when VNF preparation cannot be executed."""
+
+
+class OrchestratorDeploymentException(Exception):
+ """Raise when orchestrator cannot be deployed."""
+
+
+class VnfDeploymentException(Exception):
+ """Raise when VNF cannot be deployed."""
+
+
+class VnfTestException(Exception):
+ """Raise when VNF cannot be tested."""
+
+
+class VnfOnBoarding(testcase.TestCase):
+ # pylint: disable=too-many-instance-attributes
+ """Base model for VNF test cases."""
+
+ __logger = logging.getLogger(__name__)
+
+ def __init__(self, **kwargs):
+ super(VnfOnBoarding, self).__init__(**kwargs)
+ self.uuid = uuid.uuid4()
+ self.user_name = "{}-{}".format(self.case_name, self.uuid)
+ self.tenant_name = "{}-{}".format(self.case_name, self.uuid)
+ self.snaps_creds = {}
+ self.created_object = []
+ self.os_project = None
+ self.tenant_description = "Created by OPNFV Functest: {}".format(
+ self.case_name)
+
+ def run(self, **kwargs):
+ """
+ Run of the VNF test case:
+
+ * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...),
+ * Deploy the VNF,
+ * Perform tests on the VNF
+
+ A VNF test case is successfull when the 3 steps are PASS
+ If one of the step is FAIL, the test case is FAIL
+
+ Returns:
+ TestCase.EX_OK if result is 'PASS'.
+ TestCase.EX_TESTCASE_FAILED otherwise.
+ """
+ self.start_time = time.time()
+
+ try:
+ self.prepare()
+ if (self.deploy_orchestrator() and
+ self.deploy_vnf() and
+ self.test_vnf()):
+ self.stop_time = time.time()
+ # Calculation with different weight depending on the steps TODO
+ self.result = 100
+ return testcase.TestCase.EX_OK
+ self.result = 0
+ self.stop_time = time.time()
+ return testcase.TestCase.EX_TESTCASE_FAILED
+ except Exception: # pylint: disable=broad-except
+ self.stop_time = time.time()
+ self.__logger.exception("Exception on VNF testing")
+ return testcase.TestCase.EX_TESTCASE_FAILED
+
+ def prepare(self):
+ """
+ Prepare the environment for VNF testing:
+
+ * Creation of a user,
+ * Creation of a tenant,
+ * Allocation admin role to the user on this tenant
+
+ Returns base.TestCase.EX_OK if preparation is successfull
+
+ Raise VnfPreparationException in case of problem
+ """
+ try:
+ self.__logger.info(
+ "Prepare VNF: %s, description: %s", self.case_name,
+ self.tenant_description)
+ snaps_creds = openstack_tests.get_credentials(
+ os_env_file=constants.ENV_FILE)
+
+ self.os_project = OpenStackProject(
+ snaps_creds,
+ ProjectConfig(
+ name=self.tenant_name,
+ description=self.tenant_description
+ ))
+ self.os_project.create()
+ self.created_object.append(self.os_project)
+ user_creator = OpenStackUser(
+ snaps_creds,
+ UserConfig(
+ name=self.user_name,
+ password=str(uuid.uuid4()),
+ roles={'admin': self.tenant_name}))
+ user_creator.create()
+ self.created_object.append(user_creator)
+ self.snaps_creds = user_creator.get_os_creds(self.tenant_name)
+
+ return testcase.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Exception raised during VNF preparation")
+ raise VnfPreparationException
+
+ def deploy_orchestrator(self):
+ """
+ Deploy an orchestrator (optional).
+
+ If this method is overriden then raise orchestratorDeploymentException
+ if error during orchestrator deployment
+ """
+ self.__logger.info("Deploy orchestrator (if necessary)")
+ return True
+
+ def deploy_vnf(self):
+ """
+ Deploy the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ The deployment can be executed via a specific orchestrator
+ or using build-in orchestrators such as heat, OpenBaton, cloudify,
+ juju, onap, ...
+
+ Returns:
+ True if the VNF is properly deployed
+ False if the VNF is not deployed
+
+ Raise VnfDeploymentException if error during VNF deployment
+ """
+ self.__logger.error("VNF must be deployed")
+ raise VnfDeploymentException
+
+ def test_vnf(self):
+ """
+ Test the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ Once a VNF is deployed, it is assumed that specific test suite can be
+ run to validate the VNF.
+ Please note that the same test suite can be used on several test case
+ (e.g. clearwater test suite can be used whatever the orchestrator used
+ for the deployment)
+
+ Returns:
+ True if VNF tests are PASS
+ False if test suite is FAIL
+
+ Raise VnfTestException if error during VNF test
+ """
+ self.__logger.error("VNF must be tested")
+ raise VnfTestException
+
+ def clean(self):
+ """
+ Clean VNF test case.
+
+ It is up to the test providers to delete resources used for the tests.
+ By default we clean:
+
+ * the user,
+ * the tenant
+ """
+ self.__logger.info('Removing the VNF resources ..')
+ for creator in reversed(self.created_object):
+ try:
+ creator.clean()
+ except Exception as exc: # pylint: disable=broad-except
+ self.__logger.error('Unexpected error cleaning - %s', exc)
diff --git a/xtesting/energy/__init__.py b/xtesting/energy/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/energy/__init__.py
diff --git a/xtesting/energy/energy.py b/xtesting/energy/energy.py
new file mode 100644
index 00000000..76e4873c
--- /dev/null
+++ b/xtesting/energy/energy.py
@@ -0,0 +1,334 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8 -*-
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""This module manages calls to Energy recording API."""
+
+import json
+import logging
+import traceback
+
+from functools import wraps
+import requests
+from six.moves import urllib
+
+from xtesting.utils import env
+
+
+def finish_session(current_scenario):
+ """Finish a recording session."""
+ if current_scenario is None:
+ EnergyRecorder.stop()
+ else:
+ EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)",
+ current_scenario["scenario"],
+ current_scenario["step"])
+ EnergyRecorder.submit_scenario(
+ current_scenario["scenario"],
+ current_scenario["step"]
+ )
+
+
+def enable_recording(method):
+ """
+ Record energy during method execution.
+
+ Decorator to record energy during "method" exection.
+
+ param method: Method to suround with start and stop
+ :type method: function
+
+ .. note:: "method" should belong to a class having a "case_name"
+ attribute
+ """
+ @wraps(method)
+ def wrapper(*args):
+ """
+ Record energy during method execution (implementation).
+
+ Wrapper for decorator to handle method arguments.
+ """
+ current_scenario = EnergyRecorder.get_current_scenario()
+ EnergyRecorder.start(args[0].case_name)
+ try:
+ return_value = method(*args)
+ finish_session(current_scenario)
+ except Exception as exc: # pylint: disable=broad-except
+ EnergyRecorder.logger.exception(exc)
+ finish_session(current_scenario)
+ raise exc
+ return return_value
+ return wrapper
+
+
+# Class to manage energy recording sessions
+class EnergyRecorder(object):
+ """Manage Energy recording session."""
+
+ logger = logging.getLogger(__name__)
+ # Energy recording API connectivity settings
+ # see load_config method
+ energy_recorder_api = None
+
+ # Default initial step
+ INITIAL_STEP = "running"
+
+ # Default connection timeout
+ CONNECTION_TIMEOUT = 4
+
+ @staticmethod
+ def load_config():
+ """
+ Load connectivity settings from yaml.
+
+ Load connectivity settings to Energy recording API
+ """
+ # Singleton pattern for energy_recorder_api static member
+ # Load only if not previouly done
+ if EnergyRecorder.energy_recorder_api is None:
+ assert env.get('NODE_NAME')
+ assert env.get('ENERGY_RECORDER_API_URL')
+ environment = env.get('NODE_NAME')
+ energy_recorder_uri = env.get(
+ 'ENERGY_RECORDER_API_URL')
+
+ # Creds
+ creds_usr = env.get("ENERGY_RECORDER_API_USER")
+ creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD")
+
+ uri_comp = "/recorders/environment/"
+ uri_comp += urllib.parse.quote_plus(environment)
+
+ if creds_usr and creds_pass:
+ energy_recorder_api_auth = (creds_usr, creds_pass)
+ else:
+ energy_recorder_api_auth = None
+
+ try:
+ resp = requests.get(energy_recorder_uri + "/monitoring/ping",
+ auth=energy_recorder_api_auth,
+ headers={
+ 'content-type': 'application/json'
+ },
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT)
+ api_available = json.loads(resp.text)["status"] == "OK"
+ EnergyRecorder.logger.info(
+ "API recorder available at : %s",
+ energy_recorder_uri + uri_comp)
+ except Exception as exc: # pylint: disable=broad-except
+ EnergyRecorder.logger.info(
+ "Energy recorder API is not available, cause=%s",
+ str(exc))
+ api_available = False
+ # Final config
+ EnergyRecorder.energy_recorder_api = {
+ "uri": energy_recorder_uri + uri_comp,
+ "auth": energy_recorder_api_auth,
+ "available": api_available
+ }
+ return EnergyRecorder.energy_recorder_api["available"]
+
+ @staticmethod
+ def submit_scenario(scenario, step):
+ """
+ Submit a complet scenario definition to Energy recorder API.
+
+ param scenario: Scenario name
+ :type scenario: string
+ param step: Step name
+ :type step: string
+ """
+ try:
+ return_status = True
+ # Ensure that connectyvity settings are loaded
+ if EnergyRecorder.load_config():
+ EnergyRecorder.logger.debug("Submitting scenario (%s/%s)",
+ scenario, step)
+
+ # Create API payload
+ payload = {
+ "step": step,
+ "scenario": scenario
+ }
+ # Call API to start energy recording
+ response = requests.post(
+ EnergyRecorder.energy_recorder_api["uri"],
+ data=json.dumps(payload),
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ },
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+ if response.status_code != 200:
+ EnergyRecorder.logger.error(
+ "Error while submitting scenario\n%s",
+ response.text)
+ return_status = False
+ except requests.exceptions.ConnectionError:
+ EnergyRecorder.logger.warning(
+ "submit_scenario: Unable to connect energy recorder API")
+ return_status = False
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.info(
+ "Error while submitting scenarion to energy recorder API\n%s",
+ traceback.format_exc()
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def start(scenario):
+ """
+ Start a recording session for scenario.
+
+ param scenario: Starting scenario
+ :type scenario: string
+ """
+ return_status = True
+ try:
+ if EnergyRecorder.load_config():
+ EnergyRecorder.logger.debug("Starting recording")
+ return_status = EnergyRecorder.submit_scenario(
+ scenario,
+ EnergyRecorder.INITIAL_STEP
+ )
+
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.info(
+ "Error while starting energy recorder API\n%s",
+ traceback.format_exc()
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def stop():
+ """Stop current recording session."""
+ return_status = True
+ try:
+ # Ensure that connectyvity settings are loaded
+ if EnergyRecorder.load_config():
+ EnergyRecorder.logger.debug("Stopping recording")
+
+ # Call API to stop energy recording
+ response = requests.delete(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ },
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+ if response.status_code != 200:
+ EnergyRecorder.logger.error(
+ "Error while stating energy recording session\n%s",
+ response.text)
+ return_status = False
+ except requests.exceptions.ConnectionError:
+ EnergyRecorder.logger.warning(
+ "stop: Unable to connect energy recorder API")
+ return_status = False
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.info(
+ "Error while stoping energy recorder API\n%s",
+ traceback.format_exc()
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def set_step(step):
+ """Notify energy recording service of current step of the testcase."""
+ return_status = True
+ try:
+ # Ensure that connectyvity settings are loaded
+ if EnergyRecorder.load_config():
+ EnergyRecorder.logger.debug("Setting step")
+
+ # Create API payload
+ payload = {
+ "step": step,
+ }
+
+ # Call API to define step
+ response = requests.post(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ data=json.dumps(payload),
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers={
+ 'content-type': 'application/json'
+ },
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+ if response.status_code != 200:
+ EnergyRecorder.logger.error(
+ "Error while setting current step of testcase\n%s",
+ response.text)
+ return_status = False
+ except requests.exceptions.ConnectionError:
+ EnergyRecorder.logger.warning(
+ "set_step: Unable to connect energy recorder API")
+ return_status = False
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.info(
+ "Error while setting step on energy recorder API\n%s",
+ traceback.format_exc()
+ )
+ return_status = False
+ return return_status
+
+ @staticmethod
+ def get_current_scenario():
+ """Get current running scenario (if any, None else)."""
+ return_value = None
+ try:
+ # Ensure that connectyvity settings are loaded
+ if EnergyRecorder.load_config():
+ EnergyRecorder.logger.debug("Getting current scenario")
+
+ # Call API get running scenario
+ response = requests.get(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+ if response.status_code == 200:
+ return_value = json.loads(response.text)
+ elif response.status_code == 404:
+ EnergyRecorder.logger.info(
+ "No current running scenario at %s",
+ EnergyRecorder.energy_recorder_api["uri"])
+ return_value = None
+ else:
+ EnergyRecorder.logger.error(
+ "Error while getting current scenario\n%s",
+ response.text)
+ return_value = None
+ except requests.exceptions.ConnectionError:
+ EnergyRecorder.logger.warning(
+ "get_currernt_sceario: Unable to connect energy recorder API")
+ return_value = None
+ except Exception: # pylint: disable=broad-except
+ # Default exception handler to ensure that method
+ # is safe for caller
+ EnergyRecorder.logger.info(
+ "Error while getting current scenario from energy recorder API"
+ "\n%s", traceback.format_exc()
+ )
+ return_value = None
+ return return_value
diff --git a/xtesting/tests/__init__.py b/xtesting/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/__init__.py
diff --git a/xtesting/tests/unit/__init__.py b/xtesting/tests/unit/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/__init__.py
diff --git a/xtesting/tests/unit/ci/__init__.py b/xtesting/tests/unit/ci/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/ci/__init__.py
diff --git a/xtesting/tests/unit/ci/test_run_tests.py b/xtesting/tests/unit/ci/test_run_tests.py
new file mode 100644
index 00000000..de2af66d
--- /dev/null
+++ b/xtesting/tests/unit/ci/test_run_tests.py
@@ -0,0 +1,267 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+import os
+
+import mock
+
+from xtesting.ci import run_tests
+from xtesting.core.testcase import TestCase
+
+
+class FakeModule(TestCase):
+
+ def run(self, **kwargs):
+ return TestCase.EX_OK
+
+
+class RunTestsTesting(unittest.TestCase):
+
+ def setUp(self):
+ self.runner = run_tests.Runner()
+ mock_test_case = mock.Mock()
+ mock_test_case.is_successful.return_value = TestCase.EX_OK
+ self.runner.executed_test_cases['test1'] = mock_test_case
+ self.runner.executed_test_cases['test2'] = mock_test_case
+ self.sep = 'test_sep'
+ self.creds = {'OS_AUTH_URL': 'http://test_ip:test_port/v2.0',
+ 'OS_USERNAME': 'test_os_username',
+ 'OS_TENANT_NAME': 'test_tenant',
+ 'OS_PASSWORD': 'test_password'}
+ self.test = {'test_name': 'test_name'}
+ self.tier = mock.Mock()
+ test1 = mock.Mock()
+ test1.get_name.return_value = 'test1'
+ test2 = mock.Mock()
+ test2.get_name.return_value = 'test2'
+ attrs = {'get_name.return_value': 'test_tier',
+ 'get_tests.return_value': [test1, test2],
+ 'get_ci_loop.return_value': 'test_ci_loop',
+ 'get_test_names.return_value': ['test1', 'test2']}
+ self.tier.configure_mock(**attrs)
+
+ self.tiers = mock.Mock()
+ attrs = {'get_tiers.return_value': [self.tier]}
+ self.tiers.configure_mock(**attrs)
+
+ self.run_tests_parser = run_tests.RunTestsParser()
+
+ @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test')
+ def test_get_run_dict(self, *args):
+ retval = {'run': mock.Mock()}
+ args[0].return_value = retval
+ self.assertEqual(self.runner.get_run_dict('test_name'), retval['run'])
+ args[0].assert_called_once_with('test_name')
+
+ @mock.patch('xtesting.ci.run_tests.LOGGER.error')
+ @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test',
+ return_value=None)
+ def test_get_run_dict_config_ko(self, *args):
+ testname = 'test_name'
+ self.assertEqual(self.runner.get_run_dict(testname), None)
+ args[0].return_value = {}
+ self.assertEqual(self.runner.get_run_dict(testname), None)
+ calls = [mock.call(testname), mock.call(testname)]
+ args[0].assert_has_calls(calls)
+ calls = [mock.call("Cannot get %s's config options", testname),
+ mock.call("Cannot get %s's config options", testname)]
+ args[1].assert_has_calls(calls)
+
+ @mock.patch('xtesting.ci.run_tests.LOGGER.exception')
+ @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test',
+ side_effect=Exception)
+ def test_get_run_dict_exception(self, *args):
+ testname = 'test_name'
+ self.assertEqual(self.runner.get_run_dict(testname), None)
+ args[1].assert_called_once_with(
+ "Cannot get %s's config options", testname)
+
+ def _test_source_envfile(self, msg, key='OS_TENANT_NAME', value='admin'):
+ try:
+ del os.environ[key]
+ except Exception: # pylint: disable=broad-except
+ pass
+ envfile = 'rc_file'
+ with mock.patch('six.moves.builtins.open',
+ mock.mock_open(read_data=msg)) as mock_method,\
+ mock.patch('os.path.isfile', return_value=True):
+ mock_method.return_value.__iter__ = lambda self: iter(
+ self.readline, '')
+ self.runner.source_envfile(envfile)
+ mock_method.assert_called_once_with(envfile, 'r')
+ self.assertEqual(os.environ[key], value)
+
+ def test_source_envfile(self):
+ self._test_source_envfile('OS_TENANT_NAME=admin')
+ self._test_source_envfile('OS_TENANT_NAME= admin')
+ self._test_source_envfile('OS_TENANT_NAME = admin')
+ self._test_source_envfile('OS_TENANT_NAME = "admin"')
+ self._test_source_envfile('export OS_TENANT_NAME=admin')
+ self._test_source_envfile('export OS_TENANT_NAME =admin')
+ self._test_source_envfile('export OS_TENANT_NAME = admin')
+ self._test_source_envfile('export OS_TENANT_NAME = "admin"')
+ # This test will fail as soon as rc_file is fixed
+ self._test_source_envfile(
+ 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
+
+ def test_get_dict_by_test(self):
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
+ mock.patch('yaml.safe_load') as mock_yaml:
+ mock_obj = mock.Mock()
+ testcase_dict = {'case_name': 'testname',
+ 'criteria': 50}
+ attrs = {'get.return_value': [{'testcases': [testcase_dict]}]}
+ mock_obj.configure_mock(**attrs)
+ mock_yaml.return_value = mock_obj
+ self.assertDictEqual(
+ run_tests.Runner.get_dict_by_test('testname'),
+ testcase_dict)
+
+ @mock.patch('xtesting.ci.run_tests.Runner.get_run_dict',
+ return_value=None)
+ def test_run_tests_import_exception(self, *args):
+ mock_test = mock.Mock()
+ kwargs = {'get_name.return_value': 'test_name',
+ 'needs_clean.return_value': False}
+ mock_test.configure_mock(**kwargs)
+ with self.assertRaises(Exception) as context:
+ self.runner.run_test(mock_test)
+ args[0].assert_called_with('test_name')
+ msg = "Cannot import the class for the test case."
+ self.assertTrue(msg in str(context.exception))
+
+ @mock.patch('importlib.import_module', name="module",
+ return_value=mock.Mock(test_class=mock.Mock(
+ side_effect=FakeModule)))
+ @mock.patch('xtesting.ci.run_tests.Runner.get_dict_by_test')
+ def test_run_tests_default(self, *args):
+ mock_test = mock.Mock()
+ kwargs = {'get_name.return_value': 'test_name',
+ 'needs_clean.return_value': True}
+ mock_test.configure_mock(**kwargs)
+ test_run_dict = {'module': 'test_module',
+ 'class': 'test_class'}
+ with mock.patch('xtesting.ci.run_tests.Runner.get_run_dict',
+ return_value=test_run_dict):
+ self.runner.clean_flag = True
+ self.runner.run_test(mock_test)
+ args[0].assert_called_with('test_name')
+ args[1].assert_called_with('test_module')
+ self.assertEqual(self.runner.overall_result,
+ run_tests.Result.EX_OK)
+
+ @mock.patch('xtesting.ci.run_tests.Runner.run_test',
+ return_value=TestCase.EX_OK)
+ def test_run_tier_default(self, *mock_methods):
+ self.assertEqual(self.runner.run_tier(self.tier),
+ run_tests.Result.EX_OK)
+ mock_methods[0].assert_called_with(mock.ANY)
+
+ @mock.patch('xtesting.ci.run_tests.LOGGER.info')
+ def test_run_tier_missing_test(self, mock_logger_info):
+ self.tier.get_tests.return_value = None
+ self.assertEqual(self.runner.run_tier(self.tier),
+ run_tests.Result.EX_ERROR)
+ self.assertTrue(mock_logger_info.called)
+
+ @mock.patch('xtesting.ci.run_tests.LOGGER.info')
+ @mock.patch('xtesting.ci.run_tests.Runner.run_tier')
+ @mock.patch('xtesting.ci.run_tests.Runner.summary')
+ def test_run_all_default(self, *mock_methods):
+ os.environ['CI_LOOP'] = 'test_ci_loop'
+ self.runner.run_all()
+ mock_methods[1].assert_not_called()
+ self.assertTrue(mock_methods[2].called)
+
+ @mock.patch('xtesting.ci.run_tests.LOGGER.info')
+ @mock.patch('xtesting.ci.run_tests.Runner.summary')
+ def test_run_all_missing_tier(self, *mock_methods):
+ os.environ['CI_LOOP'] = 'loop_re_not_available'
+ self.runner.run_all()
+ self.assertTrue(mock_methods[1].called)
+
+ @mock.patch('xtesting.ci.run_tests.Runner.source_envfile',
+ side_effect=Exception)
+ @mock.patch('xtesting.ci.run_tests.Runner.summary')
+ def test_main_failed(self, *mock_methods):
+ kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
+ args = {'get_tier.return_value': False,
+ 'get_test.return_value': False}
+ self.runner.tiers = mock.Mock()
+ self.runner.tiers.configure_mock(**args)
+ self.assertEqual(self.runner.main(**kwargs),
+ run_tests.Result.EX_ERROR)
+ mock_methods[1].assert_called_once_with()
+
+ @mock.patch('xtesting.ci.run_tests.Runner.source_envfile')
+ @mock.patch('xtesting.ci.run_tests.Runner.run_test',
+ return_value=TestCase.EX_OK)
+ @mock.patch('xtesting.ci.run_tests.Runner.summary')
+ def test_main_tier(self, *mock_methods):
+ mock_tier = mock.Mock()
+ test_mock = mock.Mock()
+ test_mock.get_name.return_value = 'test1'
+ args = {'get_name.return_value': 'tier_name',
+ 'get_tests.return_value': [test_mock]}
+ mock_tier.configure_mock(**args)
+ kwargs = {'test': 'tier_name', 'noclean': True, 'report': True}
+ args = {'get_tier.return_value': mock_tier,
+ 'get_test.return_value': None}
+ self.runner.tiers = mock.Mock()
+ self.runner.tiers.configure_mock(**args)
+ self.assertEqual(self.runner.main(**kwargs),
+ run_tests.Result.EX_OK)
+ mock_methods[1].assert_called()
+
+ @mock.patch('xtesting.ci.run_tests.Runner.source_envfile')
+ @mock.patch('xtesting.ci.run_tests.Runner.run_test',
+ return_value=TestCase.EX_OK)
+ def test_main_test(self, *mock_methods):
+ kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
+ args = {'get_tier.return_value': None,
+ 'get_test.return_value': 'test_name'}
+ self.runner.tiers = mock.Mock()
+ mock_methods[1].return_value = self.creds
+ self.runner.tiers.configure_mock(**args)
+ self.assertEqual(self.runner.main(**kwargs),
+ run_tests.Result.EX_OK)
+ mock_methods[0].assert_called_once_with('test_name')
+
+ @mock.patch('xtesting.ci.run_tests.Runner.source_envfile')
+ @mock.patch('xtesting.ci.run_tests.Runner.run_all')
+ @mock.patch('xtesting.ci.run_tests.Runner.summary')
+ def test_main_all_tier(self, *args):
+ kwargs = {'get_tier.return_value': None,
+ 'get_test.return_value': None}
+ self.runner.tiers = mock.Mock()
+ self.runner.tiers.configure_mock(**kwargs)
+ self.assertEqual(
+ self.runner.main(test='all', noclean=True, report=True),
+ run_tests.Result.EX_OK)
+ args[0].assert_called_once_with(None)
+ args[1].assert_called_once_with()
+ args[2].assert_called_once_with()
+
+ @mock.patch('xtesting.ci.run_tests.Runner.source_envfile')
+ def test_main_any_tier_test_ko(self, *args):
+ kwargs = {'get_tier.return_value': None,
+ 'get_test.return_value': None}
+ self.runner.tiers = mock.Mock()
+ self.runner.tiers.configure_mock(**kwargs)
+ self.assertEqual(
+ self.runner.main(test='any', noclean=True, report=True),
+ run_tests.Result.EX_ERROR)
+ args[0].assert_called_once_with()
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/ci/test_tier_builder.py b/xtesting/tests/unit/ci/test_tier_builder.py
new file mode 100644
index 00000000..22a44a58
--- /dev/null
+++ b/xtesting/tests/unit/ci/test_tier_builder.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from xtesting.ci import tier_builder
+
+
+class TierBuilderTesting(unittest.TestCase):
+
+ def setUp(self):
+ self.dependency = {
+ 'installer': 'test_installer', 'scenario': 'test_scenario'}
+ self.testcase = {
+ 'dependencies': self.dependency, 'enabled': 'true',
+ 'case_name': 'test_name', 'criteria': 'test_criteria',
+ 'blocking': 'test_blocking', 'description': 'test_desc',
+ 'project_name': 'project_name'}
+ self.dic_tier = {
+ 'name': 'test_tier', 'order': 'test_order',
+ 'ci_loop': 'test_ci_loop', 'description': 'test_desc',
+ 'testcases': [self.testcase]}
+ self.mock_yaml = mock.Mock()
+ attrs = {'get.return_value': [self.dic_tier]}
+ self.mock_yaml.configure_mock(**attrs)
+
+ with mock.patch('xtesting.ci.tier_builder.yaml.safe_load',
+ return_value=self.mock_yaml), \
+ mock.patch('six.moves.builtins.open', mock.mock_open()):
+ self.tierbuilder = tier_builder.TierBuilder(
+ 'test_installer', 'test_scenario', 'testcases_file')
+ self.tier_obj = self.tierbuilder.tier_objects[0]
+
+ def test_get_tiers(self):
+ self.assertEqual(self.tierbuilder.get_tiers(),
+ [self.tier_obj])
+
+ def test_get_tier_names(self):
+ self.assertEqual(self.tierbuilder.get_tier_names(),
+ ['test_tier'])
+
+ def test_get_tier_present_tier(self):
+ self.assertEqual(self.tierbuilder.get_tier('test_tier'),
+ self.tier_obj)
+
+ def test_get_tier_missing_tier(self):
+ self.assertEqual(self.tierbuilder.get_tier('test_tier2'),
+ None)
+
+ def test_get_test_present_test(self):
+ self.assertEqual(self.tierbuilder.get_test('test_name'),
+ self.tier_obj.get_test('test_name'))
+
+ def test_get_test_missing_test(self):
+ self.assertEqual(self.tierbuilder.get_test('test_name2'),
+ None)
+
+ def test_get_tests_present_tier(self):
+ self.assertEqual(self.tierbuilder.get_tests('test_tier'),
+ self.tier_obj.tests_array)
+
+ def test_get_tests_missing_tier(self):
+ self.assertEqual(self.tierbuilder.get_tests('test_tier2'),
+ None)
+
+ def test_get_tier_name_ok(self):
+ self.assertEqual(self.tierbuilder.get_tier_name('test_name'),
+ 'test_tier')
+
+ def test_get_tier_name_ko(self):
+ self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None)
+
+ def test_str(self):
+ message = str(self.tierbuilder)
+ self.assertTrue('test_tier' in message)
+ self.assertTrue('test_order' in message)
+ self.assertTrue('test_ci_loop' in message)
+ self.assertTrue('test_desc' in message)
+ self.assertTrue('test_name' in message)
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/ci/test_tier_handler.py b/xtesting/tests/unit/ci/test_tier_handler.py
new file mode 100644
index 00000000..d1900103
--- /dev/null
+++ b/xtesting/tests/unit/ci/test_tier_handler.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from xtesting.ci import tier_handler
+
+
+class TierHandlerTesting(unittest.TestCase):
+ # pylint: disable=too-many-public-methods
+
+ def setUp(self):
+ self.test = mock.Mock()
+ attrs = {'get_name.return_value': 'test_name'}
+ self.test.configure_mock(**attrs)
+ self.mock_depend = mock.Mock()
+ attrs = {'get_scenario.return_value': 'test_scenario',
+ 'get_installer.return_value': 'test_installer'}
+ self.mock_depend.configure_mock(**attrs)
+ self.tier = tier_handler.Tier(
+ 'test_tier', 'test_order', 'test_ci_loop', description='test_desc')
+ self.testcase = tier_handler.TestCase(
+ 'test_name', 'true', self.mock_depend, 'test_criteria',
+ True, description='test_desc', project='project_name')
+ self.dependency = tier_handler.Dependency(
+ 'test_installer', 'test_scenario')
+ self.testcase.str = self.testcase.__str__()
+ self.dependency.str = self.dependency.__str__()
+ self.tier.str = self.tier.__str__()
+
+ def test_split_text(self):
+ test_str = 'this is for testing'
+ self.assertEqual(tier_handler.split_text(test_str, 10),
+ ['this is ', 'for ', 'testing '])
+
+ def test_add_test(self):
+ self.tier.add_test(self.test)
+ self.assertEqual(self.tier.tests_array, [self.test])
+
+ def test_get_skipped_test1(self):
+ self.assertEqual(self.tier.get_skipped_test(), [])
+
+ def test_get_skipped_test2(self):
+ self.tier.skip_test(self.test)
+ self.assertEqual(self.tier.get_skipped_test(), [self.test])
+
+ def test_get_tests(self):
+ self.tier.tests_array = [self.test]
+ self.assertEqual(self.tier.get_tests(), [self.test])
+
+ def test_get_test_names(self):
+ self.tier.tests_array = [self.test]
+ self.assertEqual(self.tier.get_test_names(), ['test_name'])
+
+ def test_get_test(self):
+ self.tier.tests_array = [self.test]
+ with mock.patch.object(self.tier, 'is_test', return_value=True):
+ self.assertEqual(self.tier.get_test('test_name'), self.test)
+
+ def test_get_test_missing_test(self):
+ self.tier.tests_array = [self.test]
+ with mock.patch.object(self.tier, 'is_test', return_value=False):
+ self.assertEqual(self.tier.get_test('test_name'), None)
+
+ def test_get_name(self):
+ self.assertEqual(self.tier.get_name(), 'test_tier')
+
+ def test_get_order(self):
+ self.assertEqual(self.tier.get_order(), 'test_order')
+
+ def test_get_ci_loop(self):
+ self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop')
+
+ def test_testcase_is_none_in_item(self):
+ self.assertEqual(tier_handler.TestCase.is_none("item"), False)
+
+ def test_testcase_is_none_no_item(self):
+ self.assertEqual(tier_handler.TestCase.is_none(None), True)
+
+ def test_testcase_is_compatible(self):
+ self.assertEqual(
+ self.testcase.is_compatible('test_installer', 'test_scenario'),
+ True)
+
+ def test_testcase_is_compatible_2(self):
+ self.assertEqual(
+ self.testcase.is_compatible('missing_installer', 'test_scenario'),
+ False)
+ self.assertEqual(
+ self.testcase.is_compatible('test_installer', 'missing_scenario'),
+ False)
+
+ @mock.patch('re.search', side_effect=TypeError)
+ def test_testcase_is_compatible3(self, *args):
+ self.assertEqual(
+ self.testcase.is_compatible('test_installer', 'test_scenario'),
+ False)
+ args[0].assert_called_once_with('test_installer', 'test_installer')
+
+ def test_testcase_get_name(self):
+ self.assertEqual(self.tier.get_name(), 'test_tier')
+
+ def test_testcase_is_enabled(self):
+ self.assertEqual(self.testcase.is_enabled(), 'true')
+
+ def test_testcase_get_criteria(self):
+ self.assertEqual(self.testcase.get_criteria(), 'test_criteria')
+
+ def test_testcase_is_blocking(self):
+ self.assertTrue(self.testcase.is_blocking())
+
+ def test_testcase_get_project(self):
+ self.assertEqual(self.testcase.get_project(), 'project_name')
+
+ def test_testcase_get_order(self):
+ self.assertEqual(self.tier.get_order(), 'test_order')
+
+ def test_testcase_get_ci_loop(self):
+ self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop')
+
+ def test_dependency_get_installer(self):
+ self.assertEqual(self.dependency.get_installer(), 'test_installer')
+
+ def test_dependency_get_scenario(self):
+ self.assertEqual(self.dependency.get_scenario(), 'test_scenario')
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/core/__init__.py b/xtesting/tests/unit/core/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/core/__init__.py
diff --git a/xtesting/tests/unit/core/test_feature.py b/xtesting/tests/unit/core/test_feature.py
new file mode 100644
index 00000000..9bbe5331
--- /dev/null
+++ b/xtesting/tests/unit/core/test_feature.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from xtesting.core import feature
+from xtesting.core import testcase
+
+
+class FeatureTestingBase(unittest.TestCase):
+
+ _case_name = "foo"
+ _project_name = "bar"
+ _repo = "dir_repo_bar"
+ _cmd = "run_bar_tests.py"
+ _output_file = '/home/opnfv/xtesting/results/foo.log'
+ feature = None
+
+ @mock.patch('time.time', side_effect=[1, 2])
+ def _test_run(self, status, mock_method=None):
+ self.assertEqual(self.feature.run(cmd=self._cmd), status)
+ if status == testcase.TestCase.EX_OK:
+ self.assertEqual(self.feature.result, 100)
+ else:
+ self.assertEqual(self.feature.result, 0)
+ mock_method.assert_has_calls([mock.call(), mock.call()])
+ self.assertEqual(self.feature.start_time, 1)
+ self.assertEqual(self.feature.stop_time, 2)
+
+ def test_logger_module_ko(self):
+ with mock.patch('six.moves.builtins.open'):
+ self.feature = feature.Feature(
+ project_name=self._project_name, case_name=self._case_name)
+ self.assertEqual(self.feature.logger.name, self._case_name)
+
+ def test_logger_module(self):
+ with mock.patch('six.moves.builtins.open'):
+ self.feature = feature.Feature(
+ project_name=self._project_name, case_name=self._case_name,
+ run={'module': 'bar'})
+ self.assertEqual(self.feature.logger.name, 'bar')
+
+
+class FeatureTesting(FeatureTestingBase):
+
+ def setUp(self):
+ # logging must be disabled else it calls time.time()
+ # what will break these unit tests.
+ logging.disable(logging.CRITICAL)
+ with mock.patch('six.moves.builtins.open'):
+ self.feature = feature.Feature(
+ project_name=self._project_name, case_name=self._case_name)
+
+ def test_run_exc(self):
+ # pylint: disable=bad-continuation
+ with mock.patch.object(
+ self.feature, 'execute',
+ side_effect=Exception) as mock_method:
+ self._test_run(testcase.TestCase.EX_RUN_ERROR)
+ mock_method.assert_called_once_with(cmd=self._cmd)
+
+ def test_run(self):
+ self._test_run(testcase.TestCase.EX_RUN_ERROR)
+
+
+class BashFeatureTesting(FeatureTestingBase):
+
+ def setUp(self):
+ # logging must be disabled else it calls time.time()
+ # what will break these unit tests.
+ logging.disable(logging.CRITICAL)
+ with mock.patch('six.moves.builtins.open'):
+ self.feature = feature.BashFeature(
+ project_name=self._project_name, case_name=self._case_name)
+
+ @mock.patch('subprocess.Popen')
+ def test_run_no_cmd(self, mock_subproc):
+ self.assertEqual(
+ self.feature.run(), testcase.TestCase.EX_RUN_ERROR)
+ mock_subproc.assert_not_called()
+
+ @mock.patch('subprocess.Popen')
+ def test_run_ko(self, mock_subproc):
+ with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen:
+ mock_obj = mock.Mock()
+ attrs = {'wait.return_value': 1}
+ mock_obj.configure_mock(**attrs)
+
+ mock_subproc.return_value = mock_obj
+ self._test_run(testcase.TestCase.EX_RUN_ERROR)
+ mopen.assert_called_once_with(self._output_file, "w+")
+
+ @mock.patch('subprocess.Popen')
+ def test_run(self, mock_subproc):
+ with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen:
+ mock_obj = mock.Mock()
+ attrs = {'wait.return_value': 0}
+ mock_obj.configure_mock(**attrs)
+
+ mock_subproc.return_value = mock_obj
+ self._test_run(testcase.TestCase.EX_OK)
+ mopen.assert_called_once_with(self._output_file, "w+")
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/core/test_robotframework.py b/xtesting/tests/unit/core/test_robotframework.py
new file mode 100644
index 00000000..7131b7e2
--- /dev/null
+++ b/xtesting/tests/unit/core/test_robotframework.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the classes required to fully cover robot."""
+
+import errno
+import logging
+import os
+import unittest
+
+import mock
+from robot.errors import DataError, RobotError
+from robot.result import model
+from robot.utils.robottime import timestamp_to_secs
+
+from xtesting.core import robotframework
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+
+class ResultVisitorTesting(unittest.TestCase):
+
+ """The class testing ResultVisitor."""
+ # pylint: disable=missing-docstring
+
+ def setUp(self):
+ self.visitor = robotframework.ResultVisitor()
+
+ def test_empty(self):
+ self.assertFalse(self.visitor.get_data())
+
+ def test_ok(self):
+ data = {'name': 'foo',
+ 'parent': 'bar',
+ 'status': 'PASS',
+ 'starttime': "20161216 16:00:00.000",
+ 'endtime': "20161216 16:00:01.000",
+ 'elapsedtime': 1000,
+ 'text': 'Hello, World!',
+ 'critical': True}
+ test = model.TestCase(
+ name=data['name'], status=data['status'], message=data['text'],
+ starttime=data['starttime'], endtime=data['endtime'])
+ test.parent = mock.Mock()
+ config = {'name': data['parent'],
+ 'criticality.test_is_critical.return_value': data[
+ 'critical']}
+ test.parent.configure_mock(**config)
+ self.visitor.visit_test(test)
+ self.assertEqual(self.visitor.get_data(), [data])
+
+
+class ParseResultTesting(unittest.TestCase):
+
+ """The class testing RobotFramework.parse_results()."""
+ # pylint: disable=missing-docstring
+
+ _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
+ 'endtime': '20161216 16:00:01.000'}
+
+ def setUp(self):
+ self.test = robotframework.RobotFramework(
+ case_name='robot', project_name='xtesting')
+
+ @mock.patch('robot.api.ExecutionResult', side_effect=DataError)
+ def test_raises_exc(self, mock_method):
+ with self.assertRaises(DataError):
+ self.test.parse_results()
+ mock_method.assert_called_once_with(
+ os.path.join(self.test.res_dir, 'output.xml'))
+
+ def _test_result(self, config, result):
+ suite = mock.Mock()
+ suite.configure_mock(**config)
+ with mock.patch('robot.api.ExecutionResult',
+ return_value=mock.Mock(suite=suite)):
+ self.test.parse_results()
+ self.assertEqual(self.test.result, result)
+ self.assertEqual(self.test.start_time,
+ timestamp_to_secs(config['starttime']))
+ self.assertEqual(self.test.stop_time,
+ timestamp_to_secs(config['endtime']))
+ self.assertEqual(self.test.details,
+ {'description': config['name'], 'tests': []})
+
+ def test_null_passed(self):
+ self._config.update({'statistics.critical.passed': 0,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 0)
+
+ def test_no_test(self):
+ self._config.update({'statistics.critical.passed': 20,
+ 'statistics.critical.total': 0})
+ self._test_result(self._config, 0)
+
+ def test_half_success(self):
+ self._config.update({'statistics.critical.passed': 10,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 50)
+
+ def test_success(self):
+ self._config.update({'statistics.critical.passed': 20,
+ 'statistics.critical.total': 20})
+ self._test_result(self._config, 100)
+
+
+class RunTesting(unittest.TestCase):
+
+ """The class testing RobotFramework.run()."""
+ # pylint: disable=missing-docstring
+
+ suites = ["foo"]
+ variable = []
+ variablefile = []
+
+ def setUp(self):
+ self.test = robotframework.RobotFramework(
+ case_name='robot', project_name='xtesting')
+
+ def test_exc_key_error(self):
+ self.assertEqual(self.test.run(), self.test.EX_RUN_ERROR)
+
+ @mock.patch('robot.run')
+ def _test_makedirs_exc(self, *args):
+ with mock.patch.object(self.test, 'parse_results') as mock_method:
+ self.assertEqual(
+ self.test.run(
+ suites=self.suites, variable=self.variable,
+ variablefile=self.variablefile),
+ self.test.EX_RUN_ERROR)
+ args[0].assert_not_called()
+ mock_method.asser_not_called()
+
+ @mock.patch('os.makedirs', side_effect=Exception)
+ def test_makedirs_exc(self, *args):
+ self._test_makedirs_exc()
+ args[0].assert_called_once_with(self.test.res_dir)
+
+ @mock.patch('os.makedirs', side_effect=OSError)
+ def test_makedirs_oserror(self, *args):
+ self._test_makedirs_exc()
+ args[0].assert_called_once_with(self.test.res_dir)
+
+ @mock.patch('robot.run')
+ def _test_makedirs(self, *args):
+ with mock.patch.object(self.test, 'parse_results') as mock_method:
+ self.assertEqual(
+ self.test.run(suites=self.suites, variable=self.variable),
+ self.test.EX_OK)
+ args[0].assert_called_once_with(
+ *self.suites, log='NONE', output=self.test.xml_file,
+ report='NONE', stdout=mock.ANY, variable=self.variable,
+ variablefile=self.variablefile)
+ mock_method.assert_called_once_with()
+
+ @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
+ def test_makedirs_oserror17(self, *args):
+ self._test_makedirs()
+ args[0].assert_called_once_with(self.test.res_dir)
+
+ @mock.patch('os.makedirs')
+ def test_makedirs(self, *args):
+ self._test_makedirs()
+ args[0].assert_called_once_with(self.test.res_dir)
+
+ @mock.patch('robot.run')
+ def _test_parse_results(self, status, *args):
+ self.assertEqual(
+ self.test.run(
+ suites=self.suites, variable=self.variable,
+ variablefile=self.variablefile),
+ status)
+ args[0].assert_called_once_with(
+ *self.suites, log='NONE', output=self.test.xml_file,
+ report='NONE', stdout=mock.ANY, variable=self.variable,
+ variablefile=self.variablefile)
+
+ def test_parse_results_exc(self):
+ with mock.patch.object(self.test, 'parse_results',
+ side_effect=Exception) as mock_method:
+ self._test_parse_results(self.test.EX_RUN_ERROR)
+ mock_method.assert_called_once_with()
+
+ def test_parse_results_robot_error(self):
+ with mock.patch.object(self.test, 'parse_results',
+ side_effect=RobotError('foo')) as mock_method:
+ self._test_parse_results(self.test.EX_RUN_ERROR)
+ mock_method.assert_called_once_with()
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/core/test_testcase.py b/xtesting/tests/unit/core/test_testcase.py
new file mode 100644
index 00000000..e2f56f8f
--- /dev/null
+++ b/xtesting/tests/unit/core/test_testcase.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the class required to fully cover testcase."""
+
+from datetime import datetime
+import json
+import logging
+import os
+import unittest
+
+from xtesting.core import testcase
+
+import mock
+import requests
+
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+
+class TestCaseTesting(unittest.TestCase):
+ """The class testing TestCase."""
+
+ # pylint: disable=missing-docstring,too-many-public-methods
+
+ _case_name = "base"
+ _project_name = "xtesting"
+ _published_result = "PASS"
+ _test_db_url = "http://testresults.opnfv.org/test/api/v1/results"
+ _headers = {'Content-Type': 'application/json'}
+
+ def setUp(self):
+ self.test = testcase.TestCase(case_name=self._case_name,
+ project_name=self._project_name)
+ self.test.start_time = 1
+ self.test.stop_time = 2
+ self.test.result = 100
+ self.test.details = {"Hello": "World"}
+ os.environ['TEST_DB_URL'] = TestCaseTesting._test_db_url
+ os.environ['INSTALLER_TYPE'] = "installer_type"
+ os.environ['DEPLOY_SCENARIO'] = "scenario"
+ os.environ['NODE_NAME'] = "node_name"
+ os.environ['BUILD_TAG'] = "foo-daily-master-bar"
+
+ def test_run_unimplemented(self):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_RUN_ERROR)
+
+ def _test_pushdb_missing_attribute(self):
+ self.assertEqual(self.test.push_to_db(),
+ testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+
+ def test_pushdb_no_project_name(self):
+ self.test.project_name = None
+ self._test_pushdb_missing_attribute()
+
+ def test_pushdb_no_case_name(self):
+ self.test.case_name = None
+ self._test_pushdb_missing_attribute()
+
+ def test_pushdb_no_start_time(self):
+ self.test.start_time = None
+ self._test_pushdb_missing_attribute()
+
+ def test_pushdb_no_stop_time(self):
+ self.test.stop_time = None
+ self._test_pushdb_missing_attribute()
+
+ def _test_pushdb_missing_env(self, var):
+ del os.environ[var]
+ self.assertEqual(self.test.push_to_db(),
+ testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+
+ def test_pushdb_no_db_url(self):
+ self._test_pushdb_missing_env('TEST_DB_URL')
+
+ def test_pushdb_no_installer_type(self):
+ self._test_pushdb_missing_env('INSTALLER_TYPE')
+
+ def test_pushdb_no_deploy_scenario(self):
+ self._test_pushdb_missing_env('DEPLOY_SCENARIO')
+
+ def test_pushdb_no_node_name(self):
+ self._test_pushdb_missing_env('NODE_NAME')
+
+ def test_pushdb_no_build_tag(self):
+ self._test_pushdb_missing_env('BUILD_TAG')
+
+ @mock.patch('requests.post')
+ def test_pushdb_bad_start_time(self, mock_function=None):
+ self.test.start_time = "1"
+ self.assertEqual(
+ self.test.push_to_db(),
+ testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+ mock_function.assert_not_called()
+
+ @mock.patch('requests.post')
+ def test_pushdb_bad_end_time(self, mock_function=None):
+ self.test.stop_time = "2"
+ self.assertEqual(
+ self.test.push_to_db(),
+ testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+ mock_function.assert_not_called()
+
+ def _get_data(self):
+ return {
+ "build_tag": os.environ['BUILD_TAG'],
+ "case_name": self._case_name,
+ "criteria": 'PASS' if self.test.is_successful(
+ ) == self.test.EX_OK else 'FAIL',
+ "details": self.test.details,
+ "installer": os.environ['INSTALLER_TYPE'],
+ "pod_name": os.environ['NODE_NAME'],
+ "project_name": self.test.project_name,
+ "scenario": os.environ['DEPLOY_SCENARIO'],
+ "start_date": datetime.fromtimestamp(
+ self.test.start_time).strftime('%Y-%m-%d %H:%M:%S'),
+ "stop_date": datetime.fromtimestamp(
+ self.test.stop_time).strftime('%Y-%m-%d %H:%M:%S'),
+ "version": "master"}
+
+ @mock.patch('requests.post')
+ def _test_pushdb_version(self, mock_function=None, **kwargs):
+ payload = self._get_data()
+ payload["version"] = kwargs.get("version", "unknown")
+ self.assertEqual(self.test.push_to_db(), testcase.TestCase.EX_OK)
+ mock_function.assert_called_once_with(
+ os.environ['TEST_DB_URL'],
+ data=json.dumps(payload, sort_keys=True),
+ headers=self._headers)
+
+ def test_pushdb_daily_job(self):
+ self._test_pushdb_version(version="master")
+
+ def test_pushdb_weekly_job(self):
+ os.environ['BUILD_TAG'] = 'foo-weekly-master-bar'
+ self._test_pushdb_version(version="master")
+
+ def test_pushdb_random_build_tag(self):
+ os.environ['BUILD_TAG'] = 'whatever'
+ self._test_pushdb_version(version="unknown")
+
+ @mock.patch('requests.post', return_value=mock.Mock(
+ raise_for_status=mock.Mock(
+ side_effect=requests.exceptions.HTTPError)))
+ def test_pushdb_http_errors(self, mock_function=None):
+ self.assertEqual(
+ self.test.push_to_db(),
+ testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+ mock_function.assert_called_once_with(
+ os.environ['TEST_DB_URL'],
+ data=json.dumps(self._get_data(), sort_keys=True),
+ headers=self._headers)
+
+ def test_check_criteria_missing(self):
+ self.test.criteria = None
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_check_result_missing(self):
+ self.test.result = None
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_check_result_failed(self):
+ # Backward compatibility
+ # It must be removed as soon as TestCase subclasses
+ # stop setting result = 'PASS' or 'FAIL'.
+ self.test.result = 'FAIL'
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_check_result_pass(self):
+ # Backward compatibility
+ # It must be removed as soon as TestCase subclasses
+ # stop setting result = 'PASS' or 'FAIL'.
+ self.test.result = 'PASS'
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_OK)
+
+ def test_check_result_lt(self):
+ self.test.result = 50
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_check_result_eq(self):
+ self.test.result = 100
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_OK)
+
+ def test_check_result_gt(self):
+ self.test.criteria = 50
+ self.test.result = 100
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_OK)
+
+ def test_check_result_zero(self):
+ self.test.criteria = 0
+ self.test.result = 0
+ self.assertEqual(self.test.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_get_duration_start_ko(self):
+ self.test.start_time = None
+ self.assertEqual(self.test.get_duration(), "XX:XX")
+ self.test.start_time = 0
+ self.assertEqual(self.test.get_duration(), "XX:XX")
+
+ def test_get_duration_end_ko(self):
+ self.test.stop_time = None
+ self.assertEqual(self.test.get_duration(), "XX:XX")
+ self.test.stop_time = 0
+ self.assertEqual(self.test.get_duration(), "XX:XX")
+
+ def test_get_invalid_duration(self):
+ self.test.start_time = 2
+ self.test.stop_time = 1
+ self.assertEqual(self.test.get_duration(), "XX:XX")
+
+ def test_get_zero_duration(self):
+ self.test.start_time = 2
+ self.test.stop_time = 2
+ self.assertEqual(self.test.get_duration(), "00:00")
+
+ def test_get_duration(self):
+ self.test.start_time = 1
+ self.test.stop_time = 180
+ self.assertEqual(self.test.get_duration(), "02:59")
+
+ def test_str_project_name_ko(self):
+ self.test.project_name = None
+ self.assertIn("<xtesting.core.testcase.TestCase object at",
+ str(self.test))
+
+ def test_str_case_name_ko(self):
+ self.test.case_name = None
+ self.assertIn("<xtesting.core.testcase.TestCase object at",
+ str(self.test))
+
+ def test_str_pass(self):
+ duration = '01:01'
+ with mock.patch.object(self.test, 'get_duration',
+ return_value=duration), \
+ mock.patch.object(self.test, 'is_successful',
+ return_value=testcase.TestCase.EX_OK):
+ message = str(self.test)
+ self.assertIn(self._project_name, message)
+ self.assertIn(self._case_name, message)
+ self.assertIn(duration, message)
+ self.assertIn('PASS', message)
+
+ def test_str_fail(self):
+ duration = '00:59'
+ with mock.patch.object(self.test, 'get_duration',
+ return_value=duration), \
+ mock.patch.object(
+ self.test, 'is_successful',
+ return_value=testcase.TestCase.EX_TESTCASE_FAILED):
+ message = str(self.test)
+ self.assertIn(self._project_name, message)
+ self.assertIn(self._case_name, message)
+ self.assertIn(duration, message)
+ self.assertIn('FAIL', message)
+
+ def test_clean(self):
+ self.assertEqual(self.test.clean(), None)
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/core/test_unit.py b/xtesting/tests/unit/core/test_unit.py
new file mode 100644
index 00000000..8afe0bde
--- /dev/null
+++ b/xtesting/tests/unit/core/test_unit.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from xtesting.core import unit
+from xtesting.core import testcase
+
+
+class PyTestSuiteRunnerTesting(unittest.TestCase):
+
+ def setUp(self):
+ self.psrunner = unit.Suite()
+ self.psrunner.suite = "foo"
+
+ @mock.patch('unittest.TestLoader')
+ def _test_run(self, mock_class=None, result=mock.Mock(),
+ status=testcase.TestCase.EX_OK):
+ with mock.patch('xtesting.core.unit.unittest.TextTestRunner.run',
+ return_value=result):
+ self.assertEqual(self.psrunner.run(), status)
+ mock_class.assert_not_called()
+
+ def test_check_suite_null(self):
+ self.assertEqual(unit.Suite().suite, None)
+ self.psrunner.suite = None
+ self._test_run(result=mock.Mock(),
+ status=testcase.TestCase.EX_RUN_ERROR)
+
+ def test_run_no_ut(self):
+ mock_result = mock.Mock(testsRun=0, errors=[], failures=[])
+ self._test_run(result=mock_result,
+ status=testcase.TestCase.EX_RUN_ERROR)
+ self.assertEqual(self.psrunner.result, 0)
+ self.assertEqual(self.psrunner.details,
+ {'errors': 0, 'failures': 0, 'stream': '',
+ 'testsRun': 0})
+ self.assertEqual(self.psrunner.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_result_ko(self):
+ self.psrunner.criteria = 100
+ mock_result = mock.Mock(testsRun=50, errors=[('test1', 'error_msg1')],
+ failures=[('test2', 'failure_msg1')])
+ self._test_run(result=mock_result)
+ self.assertEqual(self.psrunner.result, 96)
+ self.assertEqual(self.psrunner.details,
+ {'errors': 1, 'failures': 1, 'stream': '',
+ 'testsRun': 50})
+ self.assertEqual(self.psrunner.is_successful(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_result_ok(self):
+ mock_result = mock.Mock(testsRun=50, errors=[],
+ failures=[])
+ self._test_run(result=mock_result)
+ self.assertEqual(self.psrunner.result, 100)
+ self.assertEqual(self.psrunner.details,
+ {'errors': 0, 'failures': 0, 'stream': '',
+ 'testsRun': 50})
+ self.assertEqual(self.psrunner.is_successful(),
+ testcase.TestCase.EX_OK)
+
+ @mock.patch('unittest.TestLoader')
+ def test_run_name_exc(self, mock_class=None):
+ mock_obj = mock.Mock(side_effect=ImportError)
+ mock_class.side_effect = mock_obj
+ self.assertEqual(self.psrunner.run(name='foo'),
+ testcase.TestCase.EX_RUN_ERROR)
+ mock_class.assert_called_once_with()
+ mock_obj.assert_called_once_with()
+
+ @mock.patch('unittest.TestLoader')
+ def test_run_name(self, mock_class=None):
+ mock_result = mock.Mock(testsRun=50, errors=[],
+ failures=[])
+ mock_obj = mock.Mock()
+ mock_class.side_effect = mock_obj
+ with mock.patch('xtesting.core.unit.unittest.TextTestRunner.run',
+ return_value=mock_result):
+ self.assertEqual(self.psrunner.run(name='foo'),
+ testcase.TestCase.EX_OK)
+ mock_class.assert_called_once_with()
+ mock_obj.assert_called_once_with()
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/core/test_vnf.py b/xtesting/tests/unit/core/test_vnf.py
new file mode 100644
index 00000000..ec8a783e
--- /dev/null
+++ b/xtesting/tests/unit/core/test_vnf.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2016 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import unittest
+
+import mock
+
+from xtesting.core import vnf
+from xtesting.core import testcase
+from xtesting.utils import constants
+
+from snaps.openstack.os_credentials import OSCreds
+
+
+class VnfBaseTesting(unittest.TestCase):
+ """The class testing VNF."""
+ # pylint: disable=missing-docstring,too-many-public-methods
+
+ tenant_name = 'test_tenant_name'
+ tenant_description = 'description'
+
+ def setUp(self):
+ self.test = vnf.VnfOnBoarding(project='xtesting', case_name='foo')
+
+ def test_run_deploy_orch_exc(self):
+ with mock.patch.object(self.test, 'prepare'), \
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ side_effect=Exception) as mock_method, \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+ mock_method.assert_called_with()
+
+ def test_run_deploy_vnf_exc(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ side_effect=Exception) as mock_method:
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+ mock_method.assert_called_with()
+
+ def test_run_test_vnf_exc(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf', return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ side_effect=Exception) as mock_method:
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+ mock_method.assert_called_with()
+
+ def test_run_deploy_orch_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=False), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_deploy_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=False), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_test_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=False):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_default(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(), testcase.TestCase.EX_OK)
+
+ @mock.patch('xtesting.core.vnf.OpenStackUser')
+ @mock.patch('xtesting.core.vnf.OpenStackProject')
+ @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials',
+ side_effect=Exception)
+ def test_prepare_exc1(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+ args[0].assert_called_with(os_env_file=constants.ENV_FILE)
+ args[1].assert_not_called()
+ args[2].assert_not_called()
+
+ @mock.patch('xtesting.core.vnf.OpenStackUser')
+ @mock.patch('xtesting.core.vnf.OpenStackProject', side_effect=Exception)
+ @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
+ def test_prepare_exc2(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+ args[0].assert_called_with(os_env_file=constants.ENV_FILE)
+ args[1].assert_called_with(mock.ANY, mock.ANY)
+ args[2].assert_not_called()
+
+ @mock.patch('xtesting.core.vnf.OpenStackUser', side_effect=Exception)
+ @mock.patch('xtesting.core.vnf.OpenStackProject')
+ @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
+ def test_prepare_exc3(self, *args):
+ with self.assertRaises(Exception):
+ self.test.prepare()
+ args[0].assert_called_with(os_env_file=constants.ENV_FILE)
+ args[1].assert_called_with(mock.ANY, mock.ANY)
+ args[2].assert_called_with(mock.ANY, mock.ANY)
+
+ @mock.patch('xtesting.core.vnf.OpenStackUser')
+ @mock.patch('xtesting.core.vnf.OpenStackProject')
+ @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
+ def test_prepare_default(self, *args):
+ self.assertEqual(self.test.prepare(), testcase.TestCase.EX_OK)
+ args[0].assert_called_with(os_env_file=constants.ENV_FILE)
+ args[1].assert_called_with(mock.ANY, mock.ANY)
+ args[2].assert_called_with(mock.ANY, mock.ANY)
+
+ def test_deploy_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfDeploymentException):
+ self.test.deploy_vnf()
+
+ def test_test_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfTestException):
+ self.test.test_vnf()
+
+ def test_deploy_orch_unimplemented(self):
+ self.assertTrue(self.test.deploy_orchestrator())
+
+ @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials',
+ return_value=OSCreds(
+ username='user', password='pass',
+ auth_url='http://foo.com:5000/v3', project_name='bar'),
+ side_effect=Exception)
+ def test_prepare_keystone_client_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
+ self.test.prepare()
+ args[0].assert_called_once()
+
+ def test_vnf_clean_exc(self):
+ obj = mock.Mock()
+ obj.clean.side_effect = Exception
+ self.test.created_object = [obj]
+ self.test.clean()
+ obj.clean.assert_called_with()
+
+ def test_vnf_clean(self):
+ obj = mock.Mock()
+ self.test.created_object = [obj]
+ self.test.clean()
+ obj.clean.assert_called_with()
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/energy/__init__.py b/xtesting/tests/unit/energy/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/energy/__init__.py
diff --git a/xtesting/tests/unit/energy/test_functest_energy.py b/xtesting/tests/unit/energy/test_functest_energy.py
new file mode 100644
index 00000000..ea83c1ea
--- /dev/null
+++ b/xtesting/tests/unit/energy/test_functest_energy.py
@@ -0,0 +1,371 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8 -*-
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Unitary test for energy module."""
+# pylint: disable=unused-argument
+import logging
+import os
+import unittest
+
+import mock
+import requests
+
+from xtesting.energy.energy import EnergyRecorder
+import xtesting.energy.energy as energy
+
+CASE_NAME = "UNIT_TEST_CASE"
+STEP_NAME = "UNIT_TEST_STEP"
+
+PREVIOUS_SCENARIO = "previous_scenario"
+PREVIOUS_STEP = "previous_step"
+
+
+class MockHttpResponse(object): # pylint: disable=too-few-public-methods
+ """Mock response for Energy recorder API."""
+
+ def __init__(self, text, status_code):
+ """Create an instance of MockHttpResponse."""
+ self.text = text
+ self.status_code = status_code
+
+
+API_OK = MockHttpResponse(
+ '{"status": "OK"}',
+ 200
+)
+API_KO = MockHttpResponse(
+ '{"message": "API-KO"}',
+ 500
+)
+
+RECORDER_OK = MockHttpResponse(
+ '{"environment": "UNIT_TEST",'
+ ' "step": "string",'
+ ' "scenario": "' + CASE_NAME + '"}',
+ 200
+)
+RECORDER_KO = MockHttpResponse(
+ '{"message": "An unhandled API exception occurred (MOCK)"}',
+ 500
+)
+RECORDER_NOT_FOUND = MockHttpResponse(
+ '{"message": "Recorder not found (MOCK)"}',
+ 404
+)
+
+
+# pylint: disable=too-many-public-methods
+class EnergyRecorderTest(unittest.TestCase):
+ """Energy module unitary test suite."""
+
+ case_name = CASE_NAME
+ request_headers = {'content-type': 'application/json'}
+ returned_value_to_preserve = "value"
+ exception_message_to_preserve = "exception_message"
+
+ @staticmethod
+ def _set_env_creds():
+ """Set config values."""
+ os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
+ os.environ["ENERGY_RECORDER_API_USER"] = "user"
+ os.environ["ENERGY_RECORDER_API_PASSWORD"] = "password"
+
+ @staticmethod
+ def _set_env_nocreds():
+ """Set config values."""
+ os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
+ del os.environ["ENERGY_RECORDER_API_USER"]
+ del os.environ["ENERGY_RECORDER_API_PASSWORD"]
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_start(self, post_mock=None, get_mock=None):
+ """EnergyRecorder.start method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_start_error(self, post_mock=None):
+ """EnergyRecorder.start method (error in method)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_start_exception(self, conf_loader_mock=None):
+ """EnergyRecorder.start test with exception during execution."""
+ start_status = EnergyRecorder.start(CASE_NAME)
+ self.assertFalse(start_status)
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_start_api_error(self, post_mock=None):
+ """EnergyRecorder.start method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_set_step(self, post_mock=None):
+ """EnergyRecorder.set_step method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_set_step_api_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_set_step_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=requests.exceptions.ConnectionError())
+ def test_set_step_connection_error(self, conf_loader_mock=None):
+ """EnergyRecorder.start test with exception during execution."""
+ step_status = EnergyRecorder.set_step(STEP_NAME)
+ self.assertFalse(step_status)
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ return_value=RECORDER_OK)
+ def test_stop(self, delete_mock=None):
+ """EnergyRecorder.stop method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ return_value=RECORDER_KO)
+ def test_stop_api_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (API Error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_stop_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @energy.enable_recording
+ def __decorated_method(self):
+ """Call with to energy recorder decorators."""
+ return self.returned_value_to_preserve
+
+ @energy.enable_recording
+ def __decorated_method_with_ex(self):
+ """Call with to energy recorder decorators."""
+ raise Exception(self.exception_message_to_preserve)
+
+ @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
+ return_value=None)
+ @mock.patch("xtesting.energy.energy.EnergyRecorder")
+ def test_decorators(self,
+ recorder_mock=None,
+ cur_scenario_mock=None):
+ """Test energy module decorators."""
+ self.__decorated_method()
+ calls = [mock.call.start(self.case_name),
+ mock.call.stop()]
+ recorder_mock.assert_has_calls(calls)
+
+ @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
+ return_value={"scenario": PREVIOUS_SCENARIO,
+ "step": PREVIOUS_STEP})
+ @mock.patch("xtesting.energy.energy.EnergyRecorder")
+ def test_decorators_with_previous(self,
+ recorder_mock=None,
+ cur_scenario_mock=None):
+ """Test energy module decorators."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ self.__decorated_method()
+ calls = [mock.call.start(self.case_name),
+ mock.call.submit_scenario(PREVIOUS_SCENARIO,
+ PREVIOUS_STEP)]
+ recorder_mock.assert_has_calls(calls, True)
+
+ def test_decorator_preserve_return(self):
+ """Test that decorator preserve method returned value."""
+ self.test_load_config()
+ self.assertTrue(
+ self.__decorated_method() == self.returned_value_to_preserve
+ )
+
+ @mock.patch(
+ "xtesting.energy.energy.finish_session")
+ def test_decorator_preserve_ex(self, finish_mock=None):
+ """Test that decorator preserve method exceptions."""
+ self.test_load_config()
+ with self.assertRaises(Exception) as context:
+ self.__decorated_method_with_ex()
+ self.assertTrue(
+ self.exception_message_to_preserve in str(context.exception)
+ )
+ self.assertTrue(finish_mock.called)
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config(self, loader_mock=None, get_mock=None):
+ """Test load config."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["auth"],
+ ("user", "password")
+ )
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config_no_creds(self, loader_mock=None, get_mock=None):
+ """Test load config without creds."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_nocreds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None)
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config_ex(self, loader_mock=None, get_mock=None):
+ """Test load config with exception."""
+ for key in ['NODE_NAME', 'ENERGY_RECORDER_API_URL']:
+ os.environ[key] = ''
+ with self.assertRaises(AssertionError):
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api, None)
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_KO)
+ def test_load_config_api_ko(self, loader_mock=None, get_mock=None):
+ """Test load config with API unavailable."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api["available"],
+ False)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_OK)
+ def test_get_current_scenario(self, loader_mock=None, get_mock=None):
+ """Test get_current_scenario."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is not None)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_NOT_FOUND)
+ def test_current_scenario_not_found(self, get_mock=None):
+ """Test get current scenario not existing."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_KO)
+ def test_current_scenario_api_error(self, get_mock=None):
+ """Test get current scenario with API error."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_current_scenario_exception(self, get_mock=None):
+ """Test get current scenario with exception."""
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/utils/__init__.py b/xtesting/tests/unit/utils/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/utils/__init__.py
diff --git a/xtesting/tests/unit/utils/test_decorators.py b/xtesting/tests/unit/utils/test_decorators.py
new file mode 100644
index 00000000..83b182a8
--- /dev/null
+++ b/xtesting/tests/unit/utils/test_decorators.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Define the class required to fully cover decorators."""
+
+from datetime import datetime
+import errno
+import json
+import logging
+import os
+import unittest
+
+import mock
+
+from xtesting.core import testcase
+from xtesting.utils import decorators
+
+__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
+
+DIR = '/dev'
+FILE = '{}/null'.format(DIR)
+URL = 'file://{}'.format(FILE)
+
+
+class DecoratorsTesting(unittest.TestCase):
+ # pylint: disable=missing-docstring
+
+ _case_name = 'base'
+ _project_name = 'xtesting'
+ _start_time = 1.0
+ _stop_time = 2.0
+ _result = 'PASS'
+ _version = 'unknown'
+ _build_tag = 'none'
+ _node_name = 'bar'
+ _deploy_scenario = 'foo'
+ _installer_type = 'debian'
+
+ def setUp(self):
+ os.environ['INSTALLER_TYPE'] = self._installer_type
+ os.environ['DEPLOY_SCENARIO'] = self._deploy_scenario
+ os.environ['NODE_NAME'] = self._node_name
+ os.environ['BUILD_TAG'] = self._build_tag
+
+ def test_wraps(self):
+ self.assertEqual(testcase.TestCase.push_to_db.__name__,
+ "push_to_db")
+
+ def _get_json(self):
+ stop_time = datetime.fromtimestamp(self._stop_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ start_time = datetime.fromtimestamp(self._start_time).strftime(
+ '%Y-%m-%d %H:%M:%S')
+ data = {'project_name': self._project_name,
+ 'stop_date': stop_time, 'start_date': start_time,
+ 'case_name': self._case_name, 'build_tag': self._build_tag,
+ 'pod_name': self._node_name, 'installer': self._installer_type,
+ 'scenario': self._deploy_scenario, 'version': self._version,
+ 'details': {}, 'criteria': self._result}
+ return json.dumps(data, sort_keys=True)
+
+ def _get_testcase(self):
+ test = testcase.TestCase(
+ project_name=self._project_name, case_name=self._case_name)
+ test.start_time = self._start_time
+ test.stop_time = self._stop_time
+ test.result = 100
+ test.details = {}
+ return test
+
+ @mock.patch('requests.post')
+ def test_http_shema(self, *args):
+ os.environ['TEST_DB_URL'] = 'http://127.0.0.1'
+ test = self._get_testcase()
+ self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK)
+ args[0].assert_called_once_with(
+ 'http://127.0.0.1', data=self._get_json(),
+ headers={'Content-Type': 'application/json'})
+
+ def test_wrong_shema(self):
+ os.environ['TEST_DB_URL'] = '/dev/null'
+ test = self._get_testcase()
+ self.assertEqual(
+ test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+
+ def _test_dump(self):
+ os.environ['TEST_DB_URL'] = URL
+ with mock.patch.object(decorators, 'open', mock.mock_open(),
+ create=True) as mock_open:
+ test = self._get_testcase()
+ self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK)
+ mock_open.assert_called_once_with(FILE, 'a')
+ handle = mock_open()
+ call_args, _ = handle.write.call_args
+ self.assertIn('POST', call_args[0])
+ self.assertIn(self._get_json(), call_args[0])
+
+ @mock.patch('os.makedirs')
+ def test_default_dump(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
+ def test_makedirs_dir_exists(self, mock_method=None):
+ self._test_dump()
+ mock_method.assert_called_once_with(DIR)
+
+ @mock.patch('os.makedirs', side_effect=OSError)
+ def test_makedirs_exc(self, *args):
+ os.environ['TEST_DB_URL'] = URL
+ test = self._get_testcase()
+ self.assertEqual(
+ test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR)
+ args[0].assert_called_once_with(DIR)
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/tests/unit/utils/test_env.py b/xtesting/tests/unit/utils/test_env.py
new file mode 100644
index 00000000..08601fa5
--- /dev/null
+++ b/xtesting/tests/unit/utils/test_env.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import logging
+import os
+import unittest
+
+from six.moves import reload_module
+
+from xtesting.utils import env
+
+
+class EnvTesting(unittest.TestCase):
+ # pylint: disable=missing-docstring
+
+ def setUp(self):
+ os.environ['FOO'] = 'foo'
+ os.environ['BUILD_TAG'] = 'master'
+ os.environ['CI_LOOP'] = 'weekly'
+
+ def test_get_unset_unknown_env(self):
+ del os.environ['FOO']
+ self.assertEqual(env.get('FOO'), None)
+
+ def test_get_unknown_env(self):
+ self.assertEqual(env.get('FOO'), 'foo')
+ reload_module(env)
+
+ def test_get_unset_env(self):
+ del os.environ['CI_LOOP']
+ self.assertEqual(
+ env.get('CI_LOOP'), env.INPUTS['CI_LOOP'])
+
+ def test_get_env(self):
+ self.assertEqual(
+ env.get('CI_LOOP'), 'weekly')
+
+ def test_get_unset_env2(self):
+ del os.environ['BUILD_TAG']
+ self.assertEqual(
+ env.get('BUILD_TAG'), env.INPUTS['BUILD_TAG'])
+
+ def test_get_env2(self):
+ self.assertEqual(env.get('BUILD_TAG'), 'master')
+
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)
diff --git a/xtesting/utils/__init__.py b/xtesting/utils/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/utils/__init__.py
diff --git a/xtesting/utils/constants.py b/xtesting/utils/constants.py
new file mode 100644
index 00000000..dae08ca6
--- /dev/null
+++ b/xtesting/utils/constants.py
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+# pylint: disable=missing-docstring
+
+ENV_FILE = '/home/opnfv/xtesting/conf/env_file'
diff --git a/xtesting/utils/decorators.py b/xtesting/utils/decorators.py
new file mode 100644
index 00000000..230a99e7
--- /dev/null
+++ b/xtesting/utils/decorators.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import errno
+import functools
+import os
+
+import mock
+import requests.sessions
+from six.moves import urllib
+
+
+def can_dump_request_to_file(method):
+
+ def dump_preparedrequest(request, **kwargs):
+ # pylint: disable=unused-argument
+ parseresult = urllib.parse.urlparse(request.url)
+ if parseresult.scheme == "file":
+ try:
+ dirname = os.path.dirname(parseresult.path)
+ os.makedirs(dirname)
+ except OSError as ex:
+ if ex.errno != errno.EEXIST:
+ raise
+ with open(parseresult.path, 'a') as dumpfile:
+ headers = ""
+ for key in request.headers:
+ headers += key + " " + request.headers[key] + "\n"
+ message = "{} {}\n{}\n{}\n\n\n".format(
+ request.method, request.url, headers, request.body)
+ dumpfile.write(message)
+ return mock.Mock()
+
+ def patch_request(method, url, **kwargs):
+ with requests.sessions.Session() as session:
+ parseresult = urllib.parse.urlparse(url)
+ if parseresult.scheme == "file":
+ with mock.patch.object(session, 'send',
+ side_effect=dump_preparedrequest):
+ return session.request(method=method, url=url, **kwargs)
+ else:
+ return session.request(method=method, url=url, **kwargs)
+
+ @functools.wraps(method)
+ def hook(*args, **kwargs):
+ with mock.patch('requests.api.request', side_effect=patch_request):
+ return method(*args, **kwargs)
+
+ return hook
diff --git a/xtesting/utils/env.py b/xtesting/utils/env.py
new file mode 100644
index 00000000..aa2da0b5
--- /dev/null
+++ b/xtesting/utils/env.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2018 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# pylint: disable=missing-docstring
+
+import os
+
+import prettytable
+
+INPUTS = {
+ 'EXTERNAL_NETWORK': None,
+ 'CI_LOOP': 'daily',
+ 'DEPLOY_SCENARIO': 'os-nosdn-nofeature-noha',
+ 'INSTALLER_TYPE': None,
+ 'SDN_CONTROLLER_IP': None,
+ 'BUILD_TAG': None,
+ 'NODE_NAME': None,
+ 'POD_ARCH': None,
+ 'TEST_DB_URL': 'http://testresults.opnfv.org/test/api/v1/results',
+ 'ENERGY_RECORDER_API_URL': 'http://energy.opnfv.fr/resources',
+ 'ENERGY_RECORDER_API_USER': None,
+ 'ENERGY_RECORDER_API_PASSWORD': None
+}
+
+
+def get(env_var):
+ if env_var not in INPUTS.keys():
+ return os.environ.get(env_var, None)
+ return os.environ.get(env_var, INPUTS[env_var])
+
+
+def string():
+ msg = prettytable.PrettyTable(
+ header_style='upper', padding_width=5,
+ field_names=['env var', 'value'])
+ for env_var in INPUTS:
+ msg.add_row([env_var, get(env_var) if get(env_var) else ''])
+ return msg