aboutsummaryrefslogtreecommitdiffstats
path: root/functest
diff options
context:
space:
mode:
Diffstat (limited to 'functest')
-rw-r--r--functest/__init__.py0
-rw-r--r--functest/ci/__init__.py0
-rw-r--r--functest/ci/logging.ini80
-rw-r--r--functest/ci/run_tests.py302
-rw-r--r--functest/ci/testcases.yaml424
-rw-r--r--functest/ci/tier_builder.py106
-rw-r--r--functest/ci/tier_handler.py174
-rw-r--r--functest/core/__init__.py0
-rw-r--r--functest/core/feature.py133
-rw-r--r--functest/core/robotframework.py126
-rw-r--r--functest/core/testcase.py227
-rw-r--r--functest/core/unit.py92
-rw-r--r--functest/core/vnf.py205
-rw-r--r--functest/energy/__init__.py0
-rw-r--r--functest/energy/energy.py336
-rw-r--r--functest/tests/__init__.py0
-rw-r--r--functest/tests/unit/__init__.py0
-rw-r--r--functest/tests/unit/ci/__init__.py0
-rw-r--r--functest/tests/unit/ci/test_run_tests.py267
-rw-r--r--functest/tests/unit/ci/test_tier_builder.py93
-rw-r--r--functest/tests/unit/ci/test_tier_handler.py139
-rw-r--r--functest/tests/unit/core/__init__.py0
-rw-r--r--functest/tests/unit/core/test_feature.py117
-rw-r--r--functest/tests/unit/core/test_robotframework.py199
-rw-r--r--functest/tests/unit/core/test_testcase.py277
-rw-r--r--functest/tests/unit/core/test_unit.py98
-rw-r--r--functest/tests/unit/core/test_vnf.py187
-rw-r--r--functest/tests/unit/energy/__init__.py0
-rw-r--r--functest/tests/unit/energy/test_functest_energy.py371
-rw-r--r--functest/tests/unit/utils/__init__.py0
-rw-r--r--functest/tests/unit/utils/test_decorators.py125
-rw-r--r--functest/tests/unit/utils/test_env.py57
-rw-r--r--functest/utils/__init__.py0
-rw-r--r--functest/utils/constants.py10
-rw-r--r--functest/utils/decorators.py57
-rw-r--r--functest/utils/env.py44
36 files changed, 0 insertions, 4246 deletions
diff --git a/functest/__init__.py b/functest/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/__init__.py
+++ /dev/null
diff --git a/functest/ci/__init__.py b/functest/ci/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/ci/__init__.py
+++ /dev/null
diff --git a/functest/ci/logging.ini b/functest/ci/logging.ini
deleted file mode 100644
index f1ab7241..00000000
--- a/functest/ci/logging.ini
+++ /dev/null
@@ -1,80 +0,0 @@
-[loggers]
-keys=root,functest,api,ci,cli,core,energy,opnfv_tests,utils
-
-[handlers]
-keys=console,wconsole,file,null
-
-[formatters]
-keys=standard
-
-[logger_root]
-level=NOTSET
-handlers=null
-
-[logger_functest]
-level=NOTSET
-handlers=file
-qualname=functest
-
-[logger_api]
-level=NOTSET
-handlers=wconsole
-qualname=functest.api
-
-[logger_ci]
-level=NOTSET
-handlers=console
-qualname=functest.ci
-
-[logger_cli]
-level=NOTSET
-handlers=wconsole
-qualname=functest.cli
-
-[logger_core]
-level=NOTSET
-handlers=console
-qualname=functest.core
-
-[logger_energy]
-level=NOTSET
-handlers=wconsole
-qualname=functest.energy
-
-[logger_opnfv_tests]
-level=NOTSET
-handlers=wconsole
-qualname=functest.opnfv_tests
-
-[logger_utils]
-level=NOTSET
-handlers=wconsole
-qualname=functest.utils
-
-[handler_null]
-class=NullHandler
-level=NOTSET
-formatter=standard
-args=()
-
-[handler_console]
-class=StreamHandler
-level=INFO
-formatter=standard
-args=(sys.stdout,)
-
-[handler_wconsole]
-class=StreamHandler
-level=WARN
-formatter=standard
-args=(sys.stdout,)
-
-[handler_file]
-class=FileHandler
-level=DEBUG
-formatter=standard
-args=("/home/opnfv/functest/results/functest.log",)
-
-[formatter_standard]
-format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
-datefmt=
diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py
deleted file mode 100644
index 651a3851..00000000
--- a/functest/ci/run_tests.py
+++ /dev/null
@@ -1,302 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Ericsson AB and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-""" The entry of running tests:
-1) Parses functest/ci/testcases.yaml to check which testcase(s) to be run
-2) Execute the common operations on every testcase (run, push results to db...)
-3) Return the right status code
-"""
-
-import argparse
-import importlib
-import logging
-import logging.config
-import os
-import re
-import sys
-import textwrap
-import pkg_resources
-
-import enum
-import prettytable
-import yaml
-
-from functest.ci import tier_builder
-from functest.core import testcase
-from functest.utils import constants
-from functest.utils import env
-
-LOGGER = logging.getLogger('functest.ci.run_tests')
-
-
-class Result(enum.Enum):
- """The overall result in enumerated type"""
- # pylint: disable=too-few-public-methods
- EX_OK = os.EX_OK
- EX_ERROR = -1
-
-
-class BlockingTestFailed(Exception):
- """Exception when the blocking test fails"""
- pass
-
-
-class TestNotEnabled(Exception):
- """Exception when the test is not enabled"""
- pass
-
-
-class RunTestsParser(object):
- """Parser to run tests"""
- # pylint: disable=too-few-public-methods
-
- def __init__(self):
- self.parser = argparse.ArgumentParser()
- self.parser.add_argument("-t", "--test", dest="test", action='store',
- help="Test case or tier (group of tests) "
- "to be executed. It will run all the test "
- "if not specified.")
- self.parser.add_argument("-n", "--noclean", help="Do not clean "
- "OpenStack resources after running each "
- "test (default=false).",
- action="store_true")
- self.parser.add_argument("-r", "--report", help="Push results to "
- "database (default=false).",
- action="store_true")
-
- def parse_args(self, argv=None):
- """Parse arguments.
-
- It can call sys.exit if arguments are incorrect.
-
- Returns:
- the arguments from cmdline
- """
- return vars(self.parser.parse_args(argv))
-
-
-class Runner(object):
- """Runner class"""
-
- def __init__(self):
- self.executed_test_cases = {}
- self.overall_result = Result.EX_OK
- self.clean_flag = True
- self.report_flag = False
- self.tiers = tier_builder.TierBuilder(
- env.get('INSTALLER_TYPE'),
- env.get('DEPLOY_SCENARIO'),
- pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
-
- @staticmethod
- def source_envfile(rc_file=constants.ENV_FILE):
- """Source the env file passed as arg"""
- if not os.path.isfile(rc_file):
- LOGGER.debug("No env file %s found", rc_file)
- return
- with open(rc_file, "r") as rcfd:
- for line in rcfd:
- var = (line.rstrip('"\n').replace('export ', '').split(
- "=") if re.search(r'(.*)=(.*)', line) else None)
- # The two next lines should be modified as soon as rc_file
- # conforms with common rules. Be aware that it could induce
- # issues if value starts with '
- if var:
- key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
- value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
- os.environ[key] = value
- rcfd.seek(0, 0)
- LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
-
- @staticmethod
- def get_dict_by_test(testname):
- # pylint: disable=bad-continuation,missing-docstring
- with open(pkg_resources.resource_filename(
- 'functest', 'ci/testcases.yaml')) as tyaml:
- testcases_yaml = yaml.safe_load(tyaml)
- for dic_tier in testcases_yaml.get("tiers"):
- for dic_testcase in dic_tier['testcases']:
- if dic_testcase['case_name'] == testname:
- return dic_testcase
- LOGGER.error('Project %s is not defined in testcases.yaml', testname)
- return None
-
- @staticmethod
- def get_run_dict(testname):
- """Obtain the 'run' block of the testcase from testcases.yaml"""
- try:
- dic_testcase = Runner.get_dict_by_test(testname)
- if not dic_testcase:
- LOGGER.error("Cannot get %s's config options", testname)
- elif 'run' in dic_testcase:
- return dic_testcase['run']
- return None
- except Exception: # pylint: disable=broad-except
- LOGGER.exception("Cannot get %s's config options", testname)
- return None
-
- def run_test(self, test):
- """Run one test case"""
- if not test.is_enabled():
- raise TestNotEnabled(
- "The test case {} is not enabled".format(test.get_name()))
- LOGGER.info("Running test case '%s'...", test.get_name())
- result = testcase.TestCase.EX_RUN_ERROR
- run_dict = self.get_run_dict(test.get_name())
- if run_dict:
- try:
- module = importlib.import_module(run_dict['module'])
- cls = getattr(module, run_dict['class'])
- test_dict = Runner.get_dict_by_test(test.get_name())
- test_case = cls(**test_dict)
- self.executed_test_cases[test.get_name()] = test_case
- try:
- kwargs = run_dict['args']
- test_case.run(**kwargs)
- except KeyError:
- test_case.run()
- if self.report_flag:
- test_case.push_to_db()
- if test.get_project() == "functest":
- result = test_case.is_successful()
- else:
- result = testcase.TestCase.EX_OK
- LOGGER.info("Test result:\n\n%s\n", test_case)
- if self.clean_flag:
- test_case.clean()
- except ImportError:
- LOGGER.exception("Cannot import module %s", run_dict['module'])
- except AttributeError:
- LOGGER.exception("Cannot get class %s", run_dict['class'])
- else:
- raise Exception("Cannot import the class for the test case.")
- return result
-
- def run_tier(self, tier):
- """Run one tier"""
- tier_name = tier.get_name()
- tests = tier.get_tests()
- if not tests:
- LOGGER.info("There are no supported test cases in this tier "
- "for the given scenario")
- self.overall_result = Result.EX_ERROR
- else:
- LOGGER.info("Running tier '%s'", tier_name)
- for test in tests:
- self.run_test(test)
- test_case = self.executed_test_cases[test.get_name()]
- if test_case.is_successful() != testcase.TestCase.EX_OK:
- LOGGER.error("The test case '%s' failed.", test.get_name())
- if test.get_project() == "functest":
- self.overall_result = Result.EX_ERROR
- if test.is_blocking():
- raise BlockingTestFailed(
- "The test case {} failed and is blocking".format(
- test.get_name()))
- return self.overall_result
-
- def run_all(self):
- """Run all available testcases"""
- tiers_to_run = []
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['tiers', 'order', 'CI Loop', 'description',
- 'testcases'])
- for tier in self.tiers.get_tiers():
- ci_loop = env.get('CI_LOOP')
- if (tier.get_tests() and
- re.search(ci_loop, tier.get_ci_loop()) is not None):
- tiers_to_run.append(tier)
- msg.add_row([tier.get_name(), tier.get_order(),
- tier.get_ci_loop(),
- textwrap.fill(tier.description, width=40),
- textwrap.fill(' '.join([str(x.get_name(
- )) for x in tier.get_tests()]), width=40)])
- LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
- for tier in tiers_to_run:
- self.run_tier(tier)
-
- def main(self, **kwargs):
- """Entry point of class Runner"""
- if 'noclean' in kwargs:
- self.clean_flag = not kwargs['noclean']
- if 'report' in kwargs:
- self.report_flag = kwargs['report']
- try:
- LOGGER.info("Deployment description:\n\n%s\n", env.string())
- self.source_envfile()
- if 'test' in kwargs:
- LOGGER.debug("Test args: %s", kwargs['test'])
- if self.tiers.get_tier(kwargs['test']):
- self.run_tier(self.tiers.get_tier(kwargs['test']))
- elif self.tiers.get_test(kwargs['test']):
- result = self.run_test(
- self.tiers.get_test(kwargs['test']))
- if result != testcase.TestCase.EX_OK:
- LOGGER.error("The test case '%s' failed.",
- kwargs['test'])
- self.overall_result = Result.EX_ERROR
- elif kwargs['test'] == "all":
- self.run_all()
- else:
- LOGGER.error("Unknown test case or tier '%s', or not "
- "supported by the given scenario '%s'.",
- kwargs['test'],
- env.get('DEPLOY_SCENARIO'))
- LOGGER.debug("Available tiers are:\n\n%s",
- self.tiers)
- return Result.EX_ERROR
- else:
- self.run_all()
- except BlockingTestFailed:
- pass
- except Exception: # pylint: disable=broad-except
- LOGGER.exception("Failures when running testcase(s)")
- self.overall_result = Result.EX_ERROR
- if not self.tiers.get_test(kwargs['test']):
- self.summary(self.tiers.get_tier(kwargs['test']))
- LOGGER.info("Execution exit value: %s", self.overall_result)
- return self.overall_result
-
- def summary(self, tier=None):
- """To generate functest report showing the overall results"""
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['test case', 'project', 'tier',
- 'duration', 'result'])
- tiers = [tier] if tier else self.tiers.get_tiers()
- for each_tier in tiers:
- for test in each_tier.get_tests():
- try:
- test_case = self.executed_test_cases[test.get_name()]
- except KeyError:
- msg.add_row([test.get_name(), test.get_project(),
- each_tier.get_name(), "00:00", "SKIP"])
- else:
- result = 'PASS' if(test_case.is_successful(
- ) == test_case.EX_OK) else 'FAIL'
- msg.add_row(
- [test_case.case_name, test_case.project_name,
- self.tiers.get_tier_name(test_case.case_name),
- test_case.get_duration(), result])
- for test in each_tier.get_skipped_test():
- msg.add_row([test.get_name(), test.get_project(),
- each_tier.get_name(), "00:00", "SKIP"])
- LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
-
-
-def main():
- """Entry point"""
- logging.config.fileConfig(pkg_resources.resource_filename(
- 'functest', 'ci/logging.ini'))
- logging.captureWarnings(True)
- parser = RunTestsParser()
- args = parser.parse_args(sys.argv[1:])
- runner = Runner()
- return runner.main(**args).value
diff --git a/functest/ci/testcases.yaml b/functest/ci/testcases.yaml
deleted file mode 100644
index 953d2aea..00000000
--- a/functest/ci/testcases.yaml
+++ /dev/null
@@ -1,424 +0,0 @@
----
-tiers:
- -
- name: healthcheck
- order: 0
- ci_loop: '(daily)|(weekly)'
- description: >-
- First tier to be executed to verify the basic
- operations in the VIM.
- testcases:
- -
- case_name: connection_check
- project_name: functest
- criteria: 100
- blocking: true
- description: >-
- This test case verifies the retrieval of OpenStack clients:
- Keystone, Glance, Neutron and Nova and may perform some
- simple queries. When the config value of
- snaps.use_keystone is True, functest must have access to
- the cloud's private network.
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: ''
- run:
- module:
- 'functest.opnfv_tests.openstack.snaps.connection_check'
- class: 'ConnectionCheck'
-
- -
- case_name: api_check
- project_name: functest
- criteria: 100
- blocking: true
- description: >-
- This test case verifies the retrieval of OpenStack clients:
- Keystone, Glance, Neutron and Nova and may perform some
- simple queries. When the config value of
- snaps.use_keystone is True, functest must have access to
- the cloud's private network.
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: '^((?!lxd).)*$'
- run:
- module: 'functest.opnfv_tests.openstack.snaps.api_check'
- class: 'ApiCheck'
-
- -
- case_name: snaps_health_check
- project_name: functest
- criteria: 100
- blocking: true
- description: >-
- This test case creates executes the SimpleHealthCheck
- Python test class which creates an, image, flavor, network,
- and Cirros VM instance and observes the console output to
- validate the single port obtains the correct IP address.
- dependencies:
- installer: ''
- scenario: '^((?!lxd).)*$'
- run:
- module: 'functest.opnfv_tests.openstack.snaps.health_check'
- class: 'HealthCheck'
-
- -
- name: smoke
- order: 1
- ci_loop: '(daily)|(weekly)'
- description: >-
- Set of basic Functional tests to validate the OPNFV scenarios.
- testcases:
- -
- case_name: vping_ssh
- project_name: functest
- criteria: 100
- blocking: true
- description: >-
- This test case verifies: 1) SSH to an instance using
- floating IPs over the public network. 2) Connectivity
- between 2 instances over a private network.
- dependencies:
- installer: ''
- scenario: '^((?!odl_l3|odl-bgpvpn|gluon).)*$'
- run:
- module: 'functest.opnfv_tests.openstack.vping.vping_ssh'
- class: 'VPingSSH'
-
- -
- case_name: vping_userdata
- project_name: functest
- criteria: 100
- blocking: true
- description: >-
- This test case verifies: 1) Boot a VM with given userdata.
- 2) Connectivity between 2 instances over a private network.
- dependencies:
- installer: ''
- scenario: '^((?!lxd).)*$'
- run:
- module:
- 'functest.opnfv_tests.openstack.vping.vping_userdata'
- class: 'VPingUserdata'
-
- -
- case_name: tempest_smoke_serial
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case runs the smoke subset of the OpenStack
- Tempest suite. The list of test cases is generated by
- Tempest automatically and depends on the parameters of
- the OpenStack deplopyment.
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: ''
- run:
- module: 'functest.opnfv_tests.openstack.tempest.tempest'
- class: 'TempestSmokeSerial'
-
- -
- case_name: rally_sanity
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case runs a sub group of tests of the OpenStack
- Rally suite in smoke mode.
- dependencies:
- installer: ''
- scenario: ''
- run:
- module: 'functest.opnfv_tests.openstack.rally.rally'
- class: 'RallySanity'
-
- -
- case_name: refstack_defcore
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case runs a sub group of tests of the OpenStack
- Defcore testcases by using refstack client.
- dependencies:
- installer: ''
- scenario: ''
- run:
- module:
- 'functest.opnfv_tests.openstack.refstack_client.refstack_client'
- class: 'RefstackClient'
-
- -
- case_name: odl
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- Test Suite for the OpenDaylight SDN Controller. It
- integrates some test suites from upstream using
- Robot as the test framework.
- dependencies:
- installer: ''
- scenario: 'odl'
- run:
- module: 'functest.opnfv_tests.sdn.odl.odl'
- class: 'ODLTests'
- args:
- suites:
- - /src/odl_test/csit/suites/integration/basic
- - /src/odl_test/csit/suites/openstack/neutron
-
- -
- case_name: odl_netvirt
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- Test Suite for the OpenDaylight SDN Controller when
- the NetVirt features are installed. It integrates
- some test suites from upstream using Robot as the
- test framework.
- dependencies:
- installer: 'apex'
- scenario: 'os-odl_l3-nofeature'
- run:
- module: 'functest.opnfv_tests.sdn.odl.odl'
- class: 'ODLTests'
- args:
- suites:
- - /src/odl_test/csit/suites/integration/basic
- - /src/odl_test/csit/suites/openstack/neutron
- - /src/odl_test/csit/suites/openstack/connectivity
-
- -
- case_name: snaps_smoke
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case contains tests that setup and destroy
- environments with VMs with and without Floating IPs
- with a newly created user and project. Set the config
- value snaps.use_floating_ips (True|False) to toggle
- this functionality. When the config value of
- snaps.use_keystone is True, functest must have access to
- the cloud's private network.
-
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: '^((?!lxd).)*$'
- run:
- module: 'functest.opnfv_tests.openstack.snaps.smoke'
- class: 'SnapsSmoke'
-
- -
- name: features
- order: 2
- ci_loop: '(daily)|(weekly)'
- description: >-
- Test suites from feature projects
- integrated in functest
- testcases:
- -
- case_name: doctor-notification
- project_name: doctor
- criteria: 100
- blocking: false
- description: >-
- Test suite from Doctor project.
- dependencies:
- installer: 'apex'
- scenario: '^((?!fdio).)*$'
- run:
- module: 'functest.core.feature'
- class: 'BashFeature'
- args:
- cmd: 'doctor-test'
-
- -
- case_name: bgpvpn
- project_name: sdnvpn
- criteria: 100
- blocking: false
- description: >-
- Test suite from SDNVPN project.
- dependencies:
- installer: '(fuel)|(apex)|(netvirt)'
- scenario: 'bgpvpn'
- run:
- module: 'sdnvpn.test.functest.run_sdnvpn_tests'
- class: 'SdnvpnFunctest'
-
- -
- case_name: functest-odl-sfc
- project_name: sfc
- criteria: 100
- blocking: false
- description: >-
- Test suite for odl-sfc to test two chains with one SF and
- one chain with two SFs
- dependencies:
- installer: ''
- scenario: 'odl.*sfc'
- run:
- module: 'functest.core.feature'
- class: 'BashFeature'
- args:
- cmd: 'run_sfc_tests.py'
-
- -
- case_name: barometercollectd
- project_name: barometer
- criteria: 100
- blocking: false
- description: >-
- Test suite for the Barometer project. Separate tests verify
- the proper configuration and basic functionality of all the
- collectd plugins as described in the Project Release Plan
- dependencies:
- installer: 'apex'
- scenario: 'bar'
- run:
- module: 'baro_tests.barometer'
- class: 'BarometerCollectd'
-
- -
- case_name: fds
- project_name: fastdatastacks
- criteria: 100
- blocking: false
- description: >-
- Test Suite for the OpenDaylight SDN Controller when GBP
- features are installed. It integrates some test suites from
- upstream using Robot as the test framework.
- dependencies:
- installer: 'apex'
- scenario: 'odl.*-fdio'
- run:
- module: 'functest.opnfv_tests.sdn.odl.odl'
- class: 'ODLTests'
- args:
- suites:
- - /src/fds/testing/robot
-
- -
- name: components
- order: 3
- ci_loop: 'weekly'
- description: >-
- Extensive testing of OpenStack API.
- testcases:
- -
- case_name: tempest_full_parallel
- project_name: functest
- criteria: 80
- blocking: false
- description: >-
- The list of test cases is generated by
- Tempest automatically and depends on the parameters of
- the OpenStack deplopyment.
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: ''
- run:
- module: 'functest.opnfv_tests.openstack.tempest.tempest'
- class: 'TempestFullParallel'
-
- -
- case_name: rally_full
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case runs the full suite of scenarios of the
- OpenStack Rally suite using several threads and iterations.
- dependencies:
- installer: '^((?!netvirt).)*$'
- scenario: ''
- run:
- module: 'functest.opnfv_tests.openstack.rally.rally'
- class: 'RallyFull'
-
- -
- name: vnf
- order: 4
- ci_loop: '(daily)|(weekly)'
- description: >-
- Collection of VNF test cases.
- testcases:
- -
- case_name: cloudify_ims
- project_name: functest
- criteria: 80
- blocking: false
- description: >-
- This test case deploys an OpenSource vIMS solution from
- Clearwater using the Cloudify orchestrator. It also runs
- some signaling traffic.
- dependencies:
- installer: ''
- scenario: 'os-nosdn-nofeature-.*ha'
- run:
- module: 'functest.opnfv_tests.vnf.ims.cloudify_ims'
- class: 'CloudifyIms'
-
- -
- case_name: vyos_vrouter
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- This test case is vRouter testing.
- dependencies:
- installer: ''
- scenario: 'os-nosdn-nofeature-.*ha'
- run:
- module: 'functest.opnfv_tests.vnf.router.cloudify_vrouter'
- class: 'CloudifyVrouter'
-
- -
- case_name: orchestra_openims
- project_name: orchestra
- enabled: false
- criteria: 100
- blocking: false
- description: >-
- OpenIMS VNF deployment with Open Baton (Orchestra)
- dependencies:
- installer: ''
- scenario: 'os-nosdn-nofeature-.*ha'
- run:
- module: 'functest.opnfv_tests.vnf.ims.orchestra_openims'
- class: 'OpenImsVnf'
-
- -
- case_name: orchestra_clearwaterims
- project_name: orchestra
- enabled: false
- criteria: 100
- blocking: false
- description: >-
- ClearwaterIMS VNF deployment with Open Baton (Orchestra)
- dependencies:
- installer: ''
- scenario: 'os-nosdn-nofeature-.*ha'
- run:
- module:
- 'functest.opnfv_tests.vnf.ims.orchestra_clearwaterims'
- class: 'ClearwaterImsVnf'
-
- -
- case_name: juju_epc
- project_name: functest
- criteria: 100
- blocking: false
- description: >-
- vEPC validation with Juju as VNF manager and ABoT as test
- executor.
- dependencies:
- installer: ''
- scenario: 'os-nosdn-nofeature-.*ha'
- run:
- module: 'functest.opnfv_tests.vnf.epc.juju_epc'
- class: 'JujuEpc'
diff --git a/functest/ci/tier_builder.py b/functest/ci/tier_builder.py
deleted file mode 100644
index 370ab94d..00000000
--- a/functest/ci/tier_builder.py
+++ /dev/null
@@ -1,106 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Ericsson AB and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""TierBuilder class to parse testcases config file"""
-
-import yaml
-
-import functest.ci.tier_handler as th
-
-
-class TierBuilder(object):
- # pylint: disable=missing-docstring
-
- def __init__(self, ci_installer, ci_scenario, testcases_file):
- self.ci_installer = ci_installer
- self.ci_scenario = ci_scenario
- self.testcases_file = testcases_file
- self.dic_tier_array = None
- self.tier_objects = []
- self.testcases_yaml = None
- self.generate_tiers()
-
- def read_test_yaml(self):
- with open(self.testcases_file) as tc_file:
- self.testcases_yaml = yaml.safe_load(tc_file)
-
- self.dic_tier_array = []
- for tier in self.testcases_yaml.get("tiers"):
- self.dic_tier_array.append(tier)
-
- def generate_tiers(self):
- if self.dic_tier_array is None:
- self.read_test_yaml()
-
- del self.tier_objects[:]
- for dic_tier in self.dic_tier_array:
- tier = th.Tier(
- name=dic_tier['name'], order=dic_tier['order'],
- ci_loop=dic_tier['ci_loop'],
- description=dic_tier['description'])
-
- for dic_testcase in dic_tier['testcases']:
- installer = dic_testcase['dependencies']['installer']
- scenario = dic_testcase['dependencies']['scenario']
- dep = th.Dependency(installer, scenario)
-
- testcase = th.TestCase(
- name=dic_testcase['case_name'],
- enabled=dic_testcase.get('enabled', True),
- dependency=dep, criteria=dic_testcase['criteria'],
- blocking=dic_testcase['blocking'],
- description=dic_testcase['description'],
- project=dic_testcase['project_name'])
- if (testcase.is_compatible(self.ci_installer,
- self.ci_scenario) and
- testcase.is_enabled()):
- tier.add_test(testcase)
- else:
- tier.skip_test(testcase)
-
- self.tier_objects.append(tier)
-
- def get_tiers(self):
- return self.tier_objects
-
- def get_tier_names(self):
- tier_names = []
- for tier in self.tier_objects:
- tier_names.append(tier.get_name())
- return tier_names
-
- def get_tier(self, tier_name):
- for i in range(0, len(self.tier_objects)):
- if self.tier_objects[i].get_name() == tier_name:
- return self.tier_objects[i]
- return None
-
- def get_tier_name(self, test_name):
- for i in range(0, len(self.tier_objects)):
- if self.tier_objects[i].is_test(test_name):
- return self.tier_objects[i].name
- return None
-
- def get_test(self, test_name):
- for i in range(0, len(self.tier_objects)):
- if self.tier_objects[i].is_test(test_name):
- return self.tier_objects[i].get_test(test_name)
- return None
-
- def get_tests(self, tier_name):
- for i in range(0, len(self.tier_objects)):
- if self.tier_objects[i].get_name() == tier_name:
- return self.tier_objects[i].get_tests()
- return None
-
- def __str__(self):
- output = ""
- for i in range(0, len(self.tier_objects)):
- output += str(self.tier_objects[i]) + "\n"
- return output
diff --git a/functest/ci/tier_handler.py b/functest/ci/tier_handler.py
deleted file mode 100644
index 9fc3f24d..00000000
--- a/functest/ci/tier_handler.py
+++ /dev/null
@@ -1,174 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Ericsson AB and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Tier and TestCase classes to wrap the testcases config file"""
-# pylint: disable=missing-docstring
-
-import re
-import textwrap
-
-import prettytable
-
-
-LINE_LENGTH = 72
-
-
-def split_text(text, max_len):
- words = text.split()
- lines = []
- line = ""
- for word in words:
- if len(line) + len(word) < max_len - 1:
- line += word + " "
- else:
- lines.append(line)
- line = word + " "
- if line != "":
- lines.append(line)
- return lines
-
-
-class Tier(object):
-
- def __init__(self, name, order, ci_loop, description=""):
- self.tests_array = []
- self.skipped_tests_array = []
- self.name = name
- self.order = order
- self.ci_loop = ci_loop
- self.description = description
-
- def add_test(self, testcase):
- self.tests_array.append(testcase)
-
- def skip_test(self, testcase):
- self.skipped_tests_array.append(testcase)
-
- def get_tests(self):
- array_tests = []
- for test in self.tests_array:
- array_tests.append(test)
- return array_tests
-
- def get_skipped_test(self):
- return self.skipped_tests_array
-
- def get_test_names(self):
- array_tests = []
- for test in self.tests_array:
- array_tests.append(test.get_name())
- return array_tests
-
- def get_test(self, test_name):
- if self.is_test(test_name):
- for test in self.tests_array:
- if test.get_name() == test_name:
- return test
- return None
-
- def is_test(self, test_name):
- for test in self.tests_array:
- if test.get_name() == test_name:
- return True
- return False
-
- def get_name(self):
- return self.name
-
- def get_order(self):
- return self.order
-
- def get_ci_loop(self):
- return self.ci_loop
-
- def __str__(self):
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['tiers', 'order', 'CI Loop', 'description',
- 'testcases'])
- msg.add_row(
- [self.name, self.order, self.ci_loop,
- textwrap.fill(self.description, width=40),
- textwrap.fill(' '.join([str(x.get_name(
- )) for x in self.get_tests()]), width=40)])
- return msg.get_string()
-
-
-class TestCase(object):
-
- def __init__(self, name, enabled, dependency, criteria, blocking,
- description="", project=""):
- # pylint: disable=too-many-arguments
- self.name = name
- self.enabled = enabled
- self.dependency = dependency
- self.criteria = criteria
- self.blocking = blocking
- self.description = description
- self.project = project
-
- @staticmethod
- def is_none(item):
- return item is None or item == ""
-
- def is_compatible(self, ci_installer, ci_scenario):
- try:
- if not self.is_none(ci_installer):
- if re.search(self.dependency.get_installer(),
- ci_installer) is None:
- return False
- if not self.is_none(ci_scenario):
- if re.search(self.dependency.get_scenario(),
- ci_scenario) is None:
- return False
- return True
- except TypeError:
- return False
-
- def get_name(self):
- return self.name
-
- def is_enabled(self):
- return self.enabled
-
- def get_criteria(self):
- return self.criteria
-
- def is_blocking(self):
- return self.blocking
-
- def get_project(self):
- return self.project
-
- def __str__(self):
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['test case', 'description', 'criteria', 'dependency'])
- msg.add_row([self.name, textwrap.fill(self.description, width=40),
- self.criteria, self.dependency])
- return msg.get_string()
-
-
-class Dependency(object):
-
- def __init__(self, installer, scenario):
- self.installer = installer
- self.scenario = scenario
-
- def get_installer(self):
- return self.installer
-
- def get_scenario(self):
- return self.scenario
-
- def __str__(self):
- delimitator = "\n" if self.get_installer(
- ) and self.get_scenario() else ""
- return "{}{}{}".format(self.get_installer(), delimitator,
- self.get_scenario())
diff --git a/functest/core/__init__.py b/functest/core/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/core/__init__.py
+++ /dev/null
diff --git a/functest/core/feature.py b/functest/core/feature.py
deleted file mode 100644
index 65fd5a08..00000000
--- a/functest/core/feature.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 ZTE Corp and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent classes of all Functest Features.
-
-Feature is considered as TestCase offered by Third-party. It offers
-helpers to run any python method or any bash command.
-"""
-
-import logging
-import subprocess
-import time
-
-import functest.core.testcase as base
-
-__author__ = ("Serena Feng <feng.xiaowei@zte.com.cn>, "
- "Cedric Ollivier <cedric.ollivier@orange.com>")
-
-
-class Feature(base.TestCase):
- """Base model for single feature."""
-
- __logger = logging.getLogger(__name__)
- dir_results = "/home/opnfv/functest/results"
-
- def __init__(self, **kwargs):
- super(Feature, self).__init__(**kwargs)
- self.result_file = "{}/{}.log".format(self.dir_results, self.case_name)
- try:
- module = kwargs['run']['module']
- self.logger = logging.getLogger(module)
- except KeyError:
- self.__logger.warning(
- "Cannot get module name %s. Using %s as fallback",
- kwargs, self.case_name)
- self.logger = logging.getLogger(self.case_name)
- handler = logging.StreamHandler()
- handler.setLevel(logging.WARN)
- self.logger.addHandler(handler)
- handler = logging.FileHandler(self.result_file)
- handler.setLevel(logging.DEBUG)
- self.logger.addHandler(handler)
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
-
- def execute(self, **kwargs):
- """Execute the Python method.
-
- The subclasses must override the default implementation which
- is false on purpose.
-
- The new implementation must return 0 if success or anything
- else if failure.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- -1.
- """
- # pylint: disable=unused-argument,no-self-use
- return -1
-
- def run(self, **kwargs):
- """Run the feature.
-
- It allows executing any Python method by calling execute().
-
- It sets the following attributes required to push the results
- to DB:
-
- * result,
- * start_time,
- * stop_time.
-
- It doesn't fulfill details when pushing the results to the DB.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_OK if execute() returns 0,
- TestCase.EX_RUN_ERROR otherwise.
- """
- self.start_time = time.time()
- exit_code = base.TestCase.EX_RUN_ERROR
- self.result = 0
- try:
- if self.execute(**kwargs) == 0:
- exit_code = base.TestCase.EX_OK
- self.result = 100
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("%s FAILED", self.project_name)
- self.__logger.info("Test result is stored in '%s'", self.result_file)
- self.stop_time = time.time()
- return exit_code
-
-
-class BashFeature(Feature):
- """Class designed to run any bash command."""
-
- __logger = logging.getLogger(__name__)
-
- def execute(self, **kwargs):
- """Execute the cmd passed as arg
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- 0 if cmd returns 0,
- -1 otherwise.
- """
- ret = -1
- try:
- cmd = kwargs["cmd"]
- with open(self.result_file, 'w+') as f_stdout:
- proc = subprocess.Popen(cmd.split(), stdout=f_stdout,
- stderr=subprocess.STDOUT)
- ret = proc.wait()
- if ret != 0:
- self.__logger.error("Execute command: %s failed", cmd)
- except KeyError:
- self.__logger.error("Please give cmd as arg. kwargs: %s", kwargs)
- return ret
diff --git a/functest/core/robotframework.py b/functest/core/robotframework.py
deleted file mode 100644
index 54574a68..00000000
--- a/functest/core/robotframework.py
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define classes required to run any Robot suites."""
-
-from __future__ import division
-
-import errno
-import logging
-import os
-
-import robot.api
-from robot.errors import RobotError
-import robot.run
-from robot.utils.robottime import timestamp_to_secs
-from six import StringIO
-
-from functest.core import testcase
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class ResultVisitor(robot.api.ResultVisitor):
- """Visitor to get result details."""
-
- def __init__(self):
- self._data = []
-
- def visit_test(self, test):
- output = {}
- output['name'] = test.name
- output['parent'] = test.parent.name
- output['status'] = test.status
- output['starttime'] = test.starttime
- output['endtime'] = test.endtime
- output['critical'] = test.critical
- output['text'] = test.message
- output['elapsedtime'] = test.elapsedtime
- self._data.append(output)
-
- def get_data(self):
- """Get the details of the result."""
- return self._data
-
-
-class RobotFramework(testcase.TestCase):
- """RobotFramework runner."""
-
- __logger = logging.getLogger(__name__)
- dir_results = "/home/opnfv/functest/results"
-
- def __init__(self, **kwargs):
- self.res_dir = os.path.join(self.dir_results, 'robot')
- self.xml_file = os.path.join(self.res_dir, 'output.xml')
- super(RobotFramework, self).__init__(**kwargs)
-
- def parse_results(self):
- """Parse output.xml and get the details in it."""
- result = robot.api.ExecutionResult(self.xml_file)
- visitor = ResultVisitor()
- result.visit(visitor)
- try:
- self.result = 100 * (
- result.suite.statistics.critical.passed /
- result.suite.statistics.critical.total)
- except ZeroDivisionError:
- self.__logger.error("No test has been run")
- self.start_time = timestamp_to_secs(result.suite.starttime)
- self.stop_time = timestamp_to_secs(result.suite.endtime)
- self.details = {}
- self.details['description'] = result.suite.name
- self.details['tests'] = visitor.get_data()
-
- def run(self, **kwargs):
- """Run the RobotFramework suites
-
- Here are the steps:
- * create the output directories if required,
- * get the results in output.xml,
- * delete temporary files.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- EX_OK if all suites ran well.
- EX_RUN_ERROR otherwise.
- """
- try:
- suites = kwargs["suites"]
- variable = kwargs.get("variable", [])
- variablefile = kwargs.get("variablefile", [])
- except KeyError:
- self.__logger.exception("Mandatory args were not passed")
- return self.EX_RUN_ERROR
- try:
- os.makedirs(self.res_dir)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- self.__logger.exception("Cannot create %s", self.res_dir)
- return self.EX_RUN_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot create %s", self.res_dir)
- return self.EX_RUN_ERROR
- stream = StringIO()
- robot.run(*suites, variable=variable, variablefile=variablefile,
- output=self.xml_file, log='NONE',
- report='NONE', stdout=stream)
- self.__logger.info("\n" + stream.getvalue())
- self.__logger.info("Results were successfully generated")
- try:
- self.parse_results()
- self.__logger.info("Results were successfully parsed")
- except RobotError as ex:
- self.__logger.error("Run suites before publishing: %s", ex.message)
- return self.EX_RUN_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot parse results")
- return self.EX_RUN_ERROR
- return self.EX_OK
diff --git a/functest/core/testcase.py b/functest/core/testcase.py
deleted file mode 100644
index e8bb1409..00000000
--- a/functest/core/testcase.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class of all Functest TestCases."""
-
-from datetime import datetime
-import json
-import logging
-import os
-import re
-import requests
-
-from functest.utils import decorators
-from functest.utils import env
-
-
-import prettytable
-
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class TestCase(object):
- """Base model for single test case."""
-
- EX_OK = os.EX_OK
- """everything is OK"""
-
- EX_RUN_ERROR = os.EX_SOFTWARE
- """run() failed"""
-
- EX_PUSH_TO_DB_ERROR = os.EX_SOFTWARE - 1
- """push_to_db() failed"""
-
- EX_TESTCASE_FAILED = os.EX_SOFTWARE - 2
- """results are false"""
-
- _job_name_rule = "(dai|week)ly-(.+?)-[0-9]*"
- _headers = {'Content-Type': 'application/json'}
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- self.details = {}
- self.project_name = kwargs.get('project_name', 'functest')
- self.case_name = kwargs.get('case_name', '')
- self.criteria = kwargs.get('criteria', 100)
- self.result = 0
- self.start_time = 0
- self.stop_time = 0
-
- def __str__(self):
- try:
- assert self.project_name
- assert self.case_name
- result = 'PASS' if(self.is_successful(
- ) == TestCase.EX_OK) else 'FAIL'
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['test case', 'project', 'duration',
- 'result'])
- msg.add_row([self.case_name, self.project_name,
- self.get_duration(), result])
- return msg.get_string()
- except AssertionError:
- self.__logger.error("We cannot print invalid objects")
- return super(TestCase, self).__str__()
-
- def get_duration(self):
- """Return the duration of the test case.
-
- Returns:
- duration if start_time and stop_time are set
- "XX:XX" otherwise.
- """
- try:
- assert self.start_time
- assert self.stop_time
- if self.stop_time < self.start_time:
- return "XX:XX"
- return "{0[0]:02.0f}:{0[1]:02.0f}".format(divmod(
- self.stop_time - self.start_time, 60))
- except Exception: # pylint: disable=broad-except
- self.__logger.error("Please run test before getting the duration")
- return "XX:XX"
-
- def is_successful(self):
- """Interpret the result of the test case.
-
- It allows getting the result of TestCase. It completes run()
- which only returns the execution status.
-
- It can be overriden if checking result is not suitable.
-
- Returns:
- TestCase.EX_OK if result is 'PASS'.
- TestCase.EX_TESTCASE_FAILED otherwise.
- """
- try:
- assert self.criteria
- assert self.result is not None
- if (not isinstance(self.result, str) and
- not isinstance(self.criteria, str)):
- if self.result >= self.criteria:
- return TestCase.EX_OK
- else:
- # Backward compatibility
- # It must be removed as soon as TestCase subclasses
- # stop setting result = 'PASS' or 'FAIL'.
- # In this case criteria is unread.
- self.__logger.warning(
- "Please update result which must be an int!")
- if self.result == 'PASS':
- return TestCase.EX_OK
- except AssertionError:
- self.__logger.error("Please run test before checking the results")
- return TestCase.EX_TESTCASE_FAILED
-
- def run(self, **kwargs):
- """Run the test case.
-
- It allows running TestCase and getting its execution
- status.
-
- The subclasses must override the default implementation which
- is false on purpose.
-
- The new implementation must set the following attributes to
- push the results to DB:
-
- * result,
- * start_time,
- * stop_time.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_RUN_ERROR.
- """
- # pylint: disable=unused-argument
- self.__logger.error("Run must be implemented")
- return TestCase.EX_RUN_ERROR
-
- @decorators.can_dump_request_to_file
- def push_to_db(self):
- """Push the results of the test case to the DB.
-
- It allows publishing the results and checking the status.
-
- It could be overriden if the common implementation is not
- suitable.
-
- The following attributes must be set before pushing the results to DB:
-
- * project_name,
- * case_name,
- * result,
- * start_time,
- * stop_time.
-
- The next vars must be set in env:
-
- * TEST_DB_URL,
- * INSTALLER_TYPE,
- * DEPLOY_SCENARIO,
- * NODE_NAME,
- * BUILD_TAG.
-
- Returns:
- TestCase.EX_OK if results were pushed to DB.
- TestCase.EX_PUSH_TO_DB_ERROR otherwise.
- """
- try:
- assert self.project_name
- assert self.case_name
- assert self.start_time
- assert self.stop_time
- url = env.get('TEST_DB_URL')
- data = {"project_name": self.project_name,
- "case_name": self.case_name,
- "details": self.details}
- data["installer"] = env.get('INSTALLER_TYPE')
- data["scenario"] = env.get('DEPLOY_SCENARIO')
- data["pod_name"] = env.get('NODE_NAME')
- data["build_tag"] = env.get('BUILD_TAG')
- data["criteria"] = 'PASS' if self.is_successful(
- ) == TestCase.EX_OK else 'FAIL'
- data["start_date"] = datetime.fromtimestamp(
- self.start_time).strftime('%Y-%m-%d %H:%M:%S')
- data["stop_date"] = datetime.fromtimestamp(
- self.stop_time).strftime('%Y-%m-%d %H:%M:%S')
- try:
- data["version"] = re.search(
- TestCase._job_name_rule,
- env.get('BUILD_TAG')).group(2)
- except Exception: # pylint: disable=broad-except
- data["version"] = "unknown"
- req = requests.post(
- url, data=json.dumps(data, sort_keys=True),
- headers=self._headers)
- req.raise_for_status()
- self.__logger.info(
- "The results were successfully pushed to DB %s", url)
- except AssertionError:
- self.__logger.exception(
- "Please run test before publishing the results")
- return TestCase.EX_PUSH_TO_DB_ERROR
- except requests.exceptions.HTTPError:
- self.__logger.exception("The HTTP request raises issues")
- return TestCase.EX_PUSH_TO_DB_ERROR
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("The results cannot be pushed to DB")
- return TestCase.EX_PUSH_TO_DB_ERROR
- return TestCase.EX_OK
-
- def clean(self):
- """Clean the resources.
-
- It can be overriden if resources must be deleted after
- running the test case.
- """
diff --git a/functest/core/unit.py b/functest/core/unit.py
deleted file mode 100644
index 61b5a58d..00000000
--- a/functest/core/unit.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Cable Television Laboratories, Inc. and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class to run unittest.TestSuite as TestCase."""
-
-from __future__ import division
-
-import logging
-import time
-import unittest
-
-import six
-
-from functest.core import testcase
-
-__author__ = ("Steven Pisarski <s.pisarski@cablelabs.com>, "
- "Cedric Ollivier <cedric.ollivier@orange.com>")
-
-
-class Suite(testcase.TestCase):
- """Base model for running unittest.TestSuite."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(Suite, self).__init__(**kwargs)
- self.suite = None
-
- def run(self, **kwargs):
- """Run the test suite.
-
- It allows running any unittest.TestSuite and getting its
- execution status.
-
- By default, it runs the suite defined as instance attribute.
- It can be overriden by passing name as arg. It must
- conform with TestLoader.loadTestsFromName().
-
- It sets the following attributes required to push the results
- to DB:
-
- * result,
- * start_time,
- * stop_time,
- * details.
-
- Args:
- kwargs: Arbitrary keyword arguments.
-
- Returns:
- TestCase.EX_OK if any TestSuite has been run,
- TestCase.EX_RUN_ERROR otherwise.
- """
- try:
- name = kwargs["name"]
- try:
- self.suite = unittest.TestLoader().loadTestsFromName(name)
- except ImportError:
- self.__logger.error("Can not import %s", name)
- return testcase.TestCase.EX_RUN_ERROR
- except KeyError:
- pass
- try:
- assert self.suite
- self.start_time = time.time()
- stream = six.StringIO()
- result = unittest.TextTestRunner(
- stream=stream, verbosity=2).run(self.suite)
- self.__logger.debug("\n\n%s", stream.getvalue())
- self.stop_time = time.time()
- self.details = {
- "testsRun": result.testsRun,
- "failures": len(result.failures),
- "errors": len(result.errors),
- "stream": stream.getvalue()}
- self.result = 100 * (
- (result.testsRun - (len(result.failures) +
- len(result.errors))) /
- result.testsRun)
- return testcase.TestCase.EX_OK
- except AssertionError:
- self.__logger.error("No suite is defined")
- return testcase.TestCase.EX_RUN_ERROR
- except ZeroDivisionError:
- self.__logger.error("No test has been run")
- return testcase.TestCase.EX_RUN_ERROR
diff --git a/functest/core/vnf.py b/functest/core/vnf.py
deleted file mode 100644
index cf20492b..00000000
--- a/functest/core/vnf.py
+++ /dev/null
@@ -1,205 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the parent class of all VNF TestCases."""
-
-import logging
-import time
-import uuid
-
-from snaps.config.user import UserConfig
-from snaps.config.project import ProjectConfig
-from snaps.openstack.create_user import OpenStackUser
-from snaps.openstack.create_project import OpenStackProject
-from snaps.openstack.tests import openstack_tests
-
-from functest.core import testcase
-from functest.utils import constants
-
-__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
- "Valentin Boucher <valentin.boucher@orange.com>")
-
-
-class VnfPreparationException(Exception):
- """Raise when VNF preparation cannot be executed."""
-
-
-class OrchestratorDeploymentException(Exception):
- """Raise when orchestrator cannot be deployed."""
-
-
-class VnfDeploymentException(Exception):
- """Raise when VNF cannot be deployed."""
-
-
-class VnfTestException(Exception):
- """Raise when VNF cannot be tested."""
-
-
-class VnfOnBoarding(testcase.TestCase):
- # pylint: disable=too-many-instance-attributes
- """Base model for VNF test cases."""
-
- __logger = logging.getLogger(__name__)
-
- def __init__(self, **kwargs):
- super(VnfOnBoarding, self).__init__(**kwargs)
- self.uuid = uuid.uuid4()
- self.user_name = "{}-{}".format(self.case_name, self.uuid)
- self.tenant_name = "{}-{}".format(self.case_name, self.uuid)
- self.snaps_creds = {}
- self.created_object = []
- self.os_project = None
- self.tenant_description = "Created by OPNFV Functest: {}".format(
- self.case_name)
-
- def run(self, **kwargs):
- """
- Run of the VNF test case:
-
- * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP,...),
- * Deploy the VNF,
- * Perform tests on the VNF
-
- A VNF test case is successfull when the 3 steps are PASS
- If one of the step is FAIL, the test case is FAIL
-
- Returns:
- TestCase.EX_OK if result is 'PASS'.
- TestCase.EX_TESTCASE_FAILED otherwise.
- """
- self.start_time = time.time()
-
- try:
- self.prepare()
- if (self.deploy_orchestrator() and
- self.deploy_vnf() and
- self.test_vnf()):
- self.stop_time = time.time()
- # Calculation with different weight depending on the steps TODO
- self.result = 100
- return testcase.TestCase.EX_OK
- self.result = 0
- self.stop_time = time.time()
- return testcase.TestCase.EX_TESTCASE_FAILED
- except Exception: # pylint: disable=broad-except
- self.stop_time = time.time()
- self.__logger.exception("Exception on VNF testing")
- return testcase.TestCase.EX_TESTCASE_FAILED
-
- def prepare(self):
- """
- Prepare the environment for VNF testing:
-
- * Creation of a user,
- * Creation of a tenant,
- * Allocation admin role to the user on this tenant
-
- Returns base.TestCase.EX_OK if preparation is successfull
-
- Raise VnfPreparationException in case of problem
- """
- try:
- self.__logger.info(
- "Prepare VNF: %s, description: %s", self.case_name,
- self.tenant_description)
- snaps_creds = openstack_tests.get_credentials(
- os_env_file=constants.ENV_FILE)
-
- self.os_project = OpenStackProject(
- snaps_creds,
- ProjectConfig(
- name=self.tenant_name,
- description=self.tenant_description
- ))
- self.os_project.create()
- self.created_object.append(self.os_project)
- user_creator = OpenStackUser(
- snaps_creds,
- UserConfig(
- name=self.user_name,
- password=str(uuid.uuid4()),
- roles={'admin': self.tenant_name}))
- user_creator.create()
- self.created_object.append(user_creator)
- self.snaps_creds = user_creator.get_os_creds(self.tenant_name)
-
- return testcase.TestCase.EX_OK
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Exception raised during VNF preparation")
- raise VnfPreparationException
-
- def deploy_orchestrator(self):
- """
- Deploy an orchestrator (optional).
-
- If this method is overriden then raise orchestratorDeploymentException
- if error during orchestrator deployment
- """
- self.__logger.info("Deploy orchestrator (if necessary)")
- return True
-
- def deploy_vnf(self):
- """
- Deploy the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- The deployment can be executed via a specific orchestrator
- or using build-in orchestrators such as heat, OpenBaton, cloudify,
- juju, onap, ...
-
- Returns:
- True if the VNF is properly deployed
- False if the VNF is not deployed
-
- Raise VnfDeploymentException if error during VNF deployment
- """
- self.__logger.error("VNF must be deployed")
- raise VnfDeploymentException
-
- def test_vnf(self):
- """
- Test the VNF
-
- This function MUST be implemented by vnf test cases.
- The details section MAY be updated in the vnf test cases.
-
- Once a VNF is deployed, it is assumed that specific test suite can be
- run to validate the VNF.
- Please note that the same test suite can be used on several test case
- (e.g. clearwater test suite can be used whatever the orchestrator used
- for the deployment)
-
- Returns:
- True if VNF tests are PASS
- False if test suite is FAIL
-
- Raise VnfTestException if error during VNF test
- """
- self.__logger.error("VNF must be tested")
- raise VnfTestException
-
- def clean(self):
- """
- Clean VNF test case.
-
- It is up to the test providers to delete resources used for the tests.
- By default we clean:
-
- * the user,
- * the tenant
- """
- self.__logger.info('Removing the VNF resources ..')
- for creator in reversed(self.created_object):
- try:
- creator.clean()
- except Exception as exc: # pylint: disable=broad-except
- self.__logger.error('Unexpected error cleaning - %s', exc)
diff --git a/functest/energy/__init__.py b/functest/energy/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/energy/__init__.py
+++ /dev/null
diff --git a/functest/energy/energy.py b/functest/energy/energy.py
deleted file mode 100644
index a2652211..00000000
--- a/functest/energy/energy.py
+++ /dev/null
@@ -1,336 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""This module manages calls to Energy recording API."""
-
-import json
-import logging
-import traceback
-
-from functools import wraps
-import requests
-from six.moves import urllib
-
-from functest.utils import env
-
-
-def finish_session(current_scenario):
- """Finish a recording session."""
- if current_scenario is None:
- EnergyRecorder.stop()
- else:
- EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)",
- current_scenario["scenario"],
- current_scenario["step"])
- EnergyRecorder.submit_scenario(
- current_scenario["scenario"],
- current_scenario["step"]
- )
-
-
-def enable_recording(method):
- """
- Record energy during method execution.
-
- Decorator to record energy during "method" exection.
-
- param method: Method to suround with start and stop
- :type method: function
-
- .. note:: "method" should belong to a class having a "case_name"
- attribute
- """
- @wraps(method)
- def wrapper(*args):
- """
- Record energy during method execution (implementation).
-
- Wrapper for decorator to handle method arguments.
- """
- current_scenario = EnergyRecorder.get_current_scenario()
- EnergyRecorder.start(args[0].case_name)
- try:
- return_value = method(*args)
- finish_session(current_scenario)
- except Exception as exc: # pylint: disable=broad-except
- EnergyRecorder.logger.exception(exc)
- finish_session(current_scenario)
- raise exc
- return return_value
- return wrapper
-
-
-# Class to manage energy recording sessions
-class EnergyRecorder(object):
- """Manage Energy recording session."""
-
- logger = logging.getLogger(__name__)
- # Energy recording API connectivity settings
- # see load_config method
- energy_recorder_api = None
-
- # Default initial step
- INITIAL_STEP = "running"
-
- # Default connection timeout
- CONNECTION_TIMEOUT = 4
-
- @staticmethod
- def load_config():
- """
- Load connectivity settings from yaml.
-
- Load connectivity settings to Energy recording API
- Use functest global config yaml file
- (see functest_utils.get_functest_config)
- """
- # Singleton pattern for energy_recorder_api static member
- # Load only if not previouly done
- if EnergyRecorder.energy_recorder_api is None:
- assert env.get('NODE_NAME')
- assert env.get('ENERGY_RECORDER_API_URL')
- environment = env.get('NODE_NAME')
- energy_recorder_uri = env.get(
- 'ENERGY_RECORDER_API_URL')
-
- # Creds
- creds_usr = env.get("ENERGY_RECORDER_API_USER")
- creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD")
-
- uri_comp = "/recorders/environment/"
- uri_comp += urllib.parse.quote_plus(environment)
-
- if creds_usr and creds_pass:
- energy_recorder_api_auth = (creds_usr, creds_pass)
- else:
- energy_recorder_api_auth = None
-
- try:
- resp = requests.get(energy_recorder_uri + "/monitoring/ping",
- auth=energy_recorder_api_auth,
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT)
- api_available = json.loads(resp.text)["status"] == "OK"
- EnergyRecorder.logger.info(
- "API recorder available at : %s",
- energy_recorder_uri + uri_comp)
- except Exception as exc: # pylint: disable=broad-except
- EnergyRecorder.logger.info(
- "Energy recorder API is not available, cause=%s",
- str(exc))
- api_available = False
- # Final config
- EnergyRecorder.energy_recorder_api = {
- "uri": energy_recorder_uri + uri_comp,
- "auth": energy_recorder_api_auth,
- "available": api_available
- }
- return EnergyRecorder.energy_recorder_api["available"]
-
- @staticmethod
- def submit_scenario(scenario, step):
- """
- Submit a complet scenario definition to Energy recorder API.
-
- param scenario: Scenario name
- :type scenario: string
- param step: Step name
- :type step: string
- """
- try:
- return_status = True
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Submitting scenario (%s/%s)",
- scenario, step)
-
- # Create API payload
- payload = {
- "step": step,
- "scenario": scenario
- }
- # Call API to start energy recording
- response = requests.post(
- EnergyRecorder.energy_recorder_api["uri"],
- data=json.dumps(payload),
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while submitting scenario\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "submit_scenario: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while submitting scenarion to energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def start(scenario):
- """
- Start a recording session for scenario.
-
- param scenario: Starting scenario
- :type scenario: string
- """
- return_status = True
- try:
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Starting recording")
- return_status = EnergyRecorder.submit_scenario(
- scenario,
- EnergyRecorder.INITIAL_STEP
- )
-
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while starting energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def stop():
- """Stop current recording session."""
- return_status = True
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Stopping recording")
-
- # Call API to stop energy recording
- response = requests.delete(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while stating energy recording session\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "stop: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while stoping energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def set_step(step):
- """Notify energy recording service of current step of the testcase."""
- return_status = True
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Setting step")
-
- # Create API payload
- payload = {
- "step": step,
- }
-
- # Call API to define step
- response = requests.post(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- data=json.dumps(payload),
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while setting current step of testcase\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "set_step: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while setting step on energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def get_current_scenario():
- """Get current running scenario (if any, None else)."""
- return_value = None
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Getting current scenario")
-
- # Call API get running scenario
- response = requests.get(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code == 200:
- return_value = json.loads(response.text)
- elif response.status_code == 404:
- EnergyRecorder.logger.info(
- "No current running scenario at %s",
- EnergyRecorder.energy_recorder_api["uri"])
- return_value = None
- else:
- EnergyRecorder.logger.error(
- "Error while getting current scenario\n%s",
- response.text)
- return_value = None
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "get_currernt_sceario: Unable to connect energy recorder API")
- return_value = None
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while getting current scenario from energy recorder API"
- "\n%s", traceback.format_exc()
- )
- return_value = None
- return return_value
diff --git a/functest/tests/__init__.py b/functest/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/__init__.py b/functest/tests/unit/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/unit/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/ci/__init__.py b/functest/tests/unit/ci/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/unit/ci/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/ci/test_run_tests.py b/functest/tests/unit/ci/test_run_tests.py
deleted file mode 100644
index b8dca20c..00000000
--- a/functest/tests/unit/ci/test_run_tests.py
+++ /dev/null
@@ -1,267 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-import os
-
-import mock
-
-from functest.ci import run_tests
-from functest.core.testcase import TestCase
-
-
-class FakeModule(TestCase):
-
- def run(self, **kwargs):
- return TestCase.EX_OK
-
-
-class RunTestsTesting(unittest.TestCase):
-
- def setUp(self):
- self.runner = run_tests.Runner()
- mock_test_case = mock.Mock()
- mock_test_case.is_successful.return_value = TestCase.EX_OK
- self.runner.executed_test_cases['test1'] = mock_test_case
- self.runner.executed_test_cases['test2'] = mock_test_case
- self.sep = 'test_sep'
- self.creds = {'OS_AUTH_URL': 'http://test_ip:test_port/v2.0',
- 'OS_USERNAME': 'test_os_username',
- 'OS_TENANT_NAME': 'test_tenant',
- 'OS_PASSWORD': 'test_password'}
- self.test = {'test_name': 'test_name'}
- self.tier = mock.Mock()
- test1 = mock.Mock()
- test1.get_name.return_value = 'test1'
- test2 = mock.Mock()
- test2.get_name.return_value = 'test2'
- attrs = {'get_name.return_value': 'test_tier',
- 'get_tests.return_value': [test1, test2],
- 'get_ci_loop.return_value': 'test_ci_loop',
- 'get_test_names.return_value': ['test1', 'test2']}
- self.tier.configure_mock(**attrs)
-
- self.tiers = mock.Mock()
- attrs = {'get_tiers.return_value': [self.tier]}
- self.tiers.configure_mock(**attrs)
-
- self.run_tests_parser = run_tests.RunTestsParser()
-
- @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test')
- def test_get_run_dict(self, *args):
- retval = {'run': mock.Mock()}
- args[0].return_value = retval
- self.assertEqual(self.runner.get_run_dict('test_name'), retval['run'])
- args[0].assert_called_once_with('test_name')
-
- @mock.patch('functest.ci.run_tests.LOGGER.error')
- @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test',
- return_value=None)
- def test_get_run_dict_config_ko(self, *args):
- testname = 'test_name'
- self.assertEqual(self.runner.get_run_dict(testname), None)
- args[0].return_value = {}
- self.assertEqual(self.runner.get_run_dict(testname), None)
- calls = [mock.call(testname), mock.call(testname)]
- args[0].assert_has_calls(calls)
- calls = [mock.call("Cannot get %s's config options", testname),
- mock.call("Cannot get %s's config options", testname)]
- args[1].assert_has_calls(calls)
-
- @mock.patch('functest.ci.run_tests.LOGGER.exception')
- @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test',
- side_effect=Exception)
- def test_get_run_dict_exception(self, *args):
- testname = 'test_name'
- self.assertEqual(self.runner.get_run_dict(testname), None)
- args[1].assert_called_once_with(
- "Cannot get %s's config options", testname)
-
- def _test_source_envfile(self, msg, key='OS_TENANT_NAME', value='admin'):
- try:
- del os.environ[key]
- except Exception: # pylint: disable=broad-except
- pass
- envfile = 'rc_file'
- with mock.patch('six.moves.builtins.open',
- mock.mock_open(read_data=msg)) as mock_method,\
- mock.patch('os.path.isfile', return_value=True):
- mock_method.return_value.__iter__ = lambda self: iter(
- self.readline, '')
- self.runner.source_envfile(envfile)
- mock_method.assert_called_once_with(envfile, 'r')
- self.assertEqual(os.environ[key], value)
-
- def test_source_envfile(self):
- self._test_source_envfile('OS_TENANT_NAME=admin')
- self._test_source_envfile('OS_TENANT_NAME= admin')
- self._test_source_envfile('OS_TENANT_NAME = admin')
- self._test_source_envfile('OS_TENANT_NAME = "admin"')
- self._test_source_envfile('export OS_TENANT_NAME=admin')
- self._test_source_envfile('export OS_TENANT_NAME =admin')
- self._test_source_envfile('export OS_TENANT_NAME = admin')
- self._test_source_envfile('export OS_TENANT_NAME = "admin"')
- # This test will fail as soon as rc_file is fixed
- self._test_source_envfile(
- 'export "\'OS_TENANT_NAME\'" = "\'admin\'"')
-
- def test_get_dict_by_test(self):
- with mock.patch('six.moves.builtins.open', mock.mock_open()), \
- mock.patch('yaml.safe_load') as mock_yaml:
- mock_obj = mock.Mock()
- testcase_dict = {'case_name': 'testname',
- 'criteria': 50}
- attrs = {'get.return_value': [{'testcases': [testcase_dict]}]}
- mock_obj.configure_mock(**attrs)
- mock_yaml.return_value = mock_obj
- self.assertDictEqual(
- run_tests.Runner.get_dict_by_test('testname'),
- testcase_dict)
-
- @mock.patch('functest.ci.run_tests.Runner.get_run_dict',
- return_value=None)
- def test_run_tests_import_exception(self, *args):
- mock_test = mock.Mock()
- kwargs = {'get_name.return_value': 'test_name',
- 'needs_clean.return_value': False}
- mock_test.configure_mock(**kwargs)
- with self.assertRaises(Exception) as context:
- self.runner.run_test(mock_test)
- args[0].assert_called_with('test_name')
- msg = "Cannot import the class for the test case."
- self.assertTrue(msg in str(context.exception))
-
- @mock.patch('importlib.import_module', name="module",
- return_value=mock.Mock(test_class=mock.Mock(
- side_effect=FakeModule)))
- @mock.patch('functest.ci.run_tests.Runner.get_dict_by_test')
- def test_run_tests_default(self, *args):
- mock_test = mock.Mock()
- kwargs = {'get_name.return_value': 'test_name',
- 'needs_clean.return_value': True}
- mock_test.configure_mock(**kwargs)
- test_run_dict = {'module': 'test_module',
- 'class': 'test_class'}
- with mock.patch('functest.ci.run_tests.Runner.get_run_dict',
- return_value=test_run_dict):
- self.runner.clean_flag = True
- self.runner.run_test(mock_test)
- args[0].assert_called_with('test_name')
- args[1].assert_called_with('test_module')
- self.assertEqual(self.runner.overall_result,
- run_tests.Result.EX_OK)
-
- @mock.patch('functest.ci.run_tests.Runner.run_test',
- return_value=TestCase.EX_OK)
- def test_run_tier_default(self, *mock_methods):
- self.assertEqual(self.runner.run_tier(self.tier),
- run_tests.Result.EX_OK)
- mock_methods[0].assert_called_with(mock.ANY)
-
- @mock.patch('functest.ci.run_tests.LOGGER.info')
- def test_run_tier_missing_test(self, mock_logger_info):
- self.tier.get_tests.return_value = None
- self.assertEqual(self.runner.run_tier(self.tier),
- run_tests.Result.EX_ERROR)
- self.assertTrue(mock_logger_info.called)
-
- @mock.patch('functest.ci.run_tests.LOGGER.info')
- @mock.patch('functest.ci.run_tests.Runner.run_tier')
- @mock.patch('functest.ci.run_tests.Runner.summary')
- def test_run_all_default(self, *mock_methods):
- os.environ['CI_LOOP'] = 'test_ci_loop'
- self.runner.run_all()
- mock_methods[1].assert_not_called()
- self.assertTrue(mock_methods[2].called)
-
- @mock.patch('functest.ci.run_tests.LOGGER.info')
- @mock.patch('functest.ci.run_tests.Runner.summary')
- def test_run_all_missing_tier(self, *mock_methods):
- os.environ['CI_LOOP'] = 'loop_re_not_available'
- self.runner.run_all()
- self.assertTrue(mock_methods[1].called)
-
- @mock.patch('functest.ci.run_tests.Runner.source_envfile',
- side_effect=Exception)
- @mock.patch('functest.ci.run_tests.Runner.summary')
- def test_main_failed(self, *mock_methods):
- kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
- args = {'get_tier.return_value': False,
- 'get_test.return_value': False}
- self.runner.tiers = mock.Mock()
- self.runner.tiers.configure_mock(**args)
- self.assertEqual(self.runner.main(**kwargs),
- run_tests.Result.EX_ERROR)
- mock_methods[1].assert_called_once_with()
-
- @mock.patch('functest.ci.run_tests.Runner.source_envfile')
- @mock.patch('functest.ci.run_tests.Runner.run_test',
- return_value=TestCase.EX_OK)
- @mock.patch('functest.ci.run_tests.Runner.summary')
- def test_main_tier(self, *mock_methods):
- mock_tier = mock.Mock()
- test_mock = mock.Mock()
- test_mock.get_name.return_value = 'test1'
- args = {'get_name.return_value': 'tier_name',
- 'get_tests.return_value': [test_mock]}
- mock_tier.configure_mock(**args)
- kwargs = {'test': 'tier_name', 'noclean': True, 'report': True}
- args = {'get_tier.return_value': mock_tier,
- 'get_test.return_value': None}
- self.runner.tiers = mock.Mock()
- self.runner.tiers.configure_mock(**args)
- self.assertEqual(self.runner.main(**kwargs),
- run_tests.Result.EX_OK)
- mock_methods[1].assert_called()
-
- @mock.patch('functest.ci.run_tests.Runner.source_envfile')
- @mock.patch('functest.ci.run_tests.Runner.run_test',
- return_value=TestCase.EX_OK)
- def test_main_test(self, *mock_methods):
- kwargs = {'test': 'test_name', 'noclean': True, 'report': True}
- args = {'get_tier.return_value': None,
- 'get_test.return_value': 'test_name'}
- self.runner.tiers = mock.Mock()
- mock_methods[1].return_value = self.creds
- self.runner.tiers.configure_mock(**args)
- self.assertEqual(self.runner.main(**kwargs),
- run_tests.Result.EX_OK)
- mock_methods[0].assert_called_once_with('test_name')
-
- @mock.patch('functest.ci.run_tests.Runner.source_envfile')
- @mock.patch('functest.ci.run_tests.Runner.run_all')
- @mock.patch('functest.ci.run_tests.Runner.summary')
- def test_main_all_tier(self, *args):
- kwargs = {'get_tier.return_value': None,
- 'get_test.return_value': None}
- self.runner.tiers = mock.Mock()
- self.runner.tiers.configure_mock(**kwargs)
- self.assertEqual(
- self.runner.main(test='all', noclean=True, report=True),
- run_tests.Result.EX_OK)
- args[0].assert_called_once_with(None)
- args[1].assert_called_once_with()
- args[2].assert_called_once_with()
-
- @mock.patch('functest.ci.run_tests.Runner.source_envfile')
- def test_main_any_tier_test_ko(self, *args):
- kwargs = {'get_tier.return_value': None,
- 'get_test.return_value': None}
- self.runner.tiers = mock.Mock()
- self.runner.tiers.configure_mock(**kwargs)
- self.assertEqual(
- self.runner.main(test='any', noclean=True, report=True),
- run_tests.Result.EX_ERROR)
- args[0].assert_called_once_with()
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_builder.py b/functest/tests/unit/ci/test_tier_builder.py
deleted file mode 100644
index ef6a007b..00000000
--- a/functest/tests/unit/ci/test_tier_builder.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-
-import mock
-
-from functest.ci import tier_builder
-
-
-class TierBuilderTesting(unittest.TestCase):
-
- def setUp(self):
- self.dependency = {
- 'installer': 'test_installer', 'scenario': 'test_scenario'}
- self.testcase = {
- 'dependencies': self.dependency, 'enabled': 'true',
- 'case_name': 'test_name', 'criteria': 'test_criteria',
- 'blocking': 'test_blocking', 'description': 'test_desc',
- 'project_name': 'project_name'}
- self.dic_tier = {
- 'name': 'test_tier', 'order': 'test_order',
- 'ci_loop': 'test_ci_loop', 'description': 'test_desc',
- 'testcases': [self.testcase]}
- self.mock_yaml = mock.Mock()
- attrs = {'get.return_value': [self.dic_tier]}
- self.mock_yaml.configure_mock(**attrs)
-
- with mock.patch('functest.ci.tier_builder.yaml.safe_load',
- return_value=self.mock_yaml), \
- mock.patch('six.moves.builtins.open', mock.mock_open()):
- self.tierbuilder = tier_builder.TierBuilder(
- 'test_installer', 'test_scenario', 'testcases_file')
- self.tier_obj = self.tierbuilder.tier_objects[0]
-
- def test_get_tiers(self):
- self.assertEqual(self.tierbuilder.get_tiers(),
- [self.tier_obj])
-
- def test_get_tier_names(self):
- self.assertEqual(self.tierbuilder.get_tier_names(),
- ['test_tier'])
-
- def test_get_tier_present_tier(self):
- self.assertEqual(self.tierbuilder.get_tier('test_tier'),
- self.tier_obj)
-
- def test_get_tier_missing_tier(self):
- self.assertEqual(self.tierbuilder.get_tier('test_tier2'),
- None)
-
- def test_get_test_present_test(self):
- self.assertEqual(self.tierbuilder.get_test('test_name'),
- self.tier_obj.get_test('test_name'))
-
- def test_get_test_missing_test(self):
- self.assertEqual(self.tierbuilder.get_test('test_name2'),
- None)
-
- def test_get_tests_present_tier(self):
- self.assertEqual(self.tierbuilder.get_tests('test_tier'),
- self.tier_obj.tests_array)
-
- def test_get_tests_missing_tier(self):
- self.assertEqual(self.tierbuilder.get_tests('test_tier2'),
- None)
-
- def test_get_tier_name_ok(self):
- self.assertEqual(self.tierbuilder.get_tier_name('test_name'),
- 'test_tier')
-
- def test_get_tier_name_ko(self):
- self.assertEqual(self.tierbuilder.get_tier_name('test_name2'), None)
-
- def test_str(self):
- message = str(self.tierbuilder)
- self.assertTrue('test_tier' in message)
- self.assertTrue('test_order' in message)
- self.assertTrue('test_ci_loop' in message)
- self.assertTrue('test_desc' in message)
- self.assertTrue('test_name' in message)
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/ci/test_tier_handler.py b/functest/tests/unit/ci/test_tier_handler.py
deleted file mode 100644
index 5e784128..00000000
--- a/functest/tests/unit/ci/test_tier_handler.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-
-import mock
-
-from functest.ci import tier_handler
-
-
-class TierHandlerTesting(unittest.TestCase):
- # pylint: disable=too-many-public-methods
-
- def setUp(self):
- self.test = mock.Mock()
- attrs = {'get_name.return_value': 'test_name'}
- self.test.configure_mock(**attrs)
- self.mock_depend = mock.Mock()
- attrs = {'get_scenario.return_value': 'test_scenario',
- 'get_installer.return_value': 'test_installer'}
- self.mock_depend.configure_mock(**attrs)
- self.tier = tier_handler.Tier(
- 'test_tier', 'test_order', 'test_ci_loop', description='test_desc')
- self.testcase = tier_handler.TestCase(
- 'test_name', 'true', self.mock_depend, 'test_criteria',
- True, description='test_desc', project='project_name')
- self.dependency = tier_handler.Dependency(
- 'test_installer', 'test_scenario')
- self.testcase.str = self.testcase.__str__()
- self.dependency.str = self.dependency.__str__()
- self.tier.str = self.tier.__str__()
-
- def test_split_text(self):
- test_str = 'this is for testing'
- self.assertEqual(tier_handler.split_text(test_str, 10),
- ['this is ', 'for ', 'testing '])
-
- def test_add_test(self):
- self.tier.add_test(self.test)
- self.assertEqual(self.tier.tests_array, [self.test])
-
- def test_get_skipped_test1(self):
- self.assertEqual(self.tier.get_skipped_test(), [])
-
- def test_get_skipped_test2(self):
- self.tier.skip_test(self.test)
- self.assertEqual(self.tier.get_skipped_test(), [self.test])
-
- def test_get_tests(self):
- self.tier.tests_array = [self.test]
- self.assertEqual(self.tier.get_tests(), [self.test])
-
- def test_get_test_names(self):
- self.tier.tests_array = [self.test]
- self.assertEqual(self.tier.get_test_names(), ['test_name'])
-
- def test_get_test(self):
- self.tier.tests_array = [self.test]
- with mock.patch.object(self.tier, 'is_test', return_value=True):
- self.assertEqual(self.tier.get_test('test_name'), self.test)
-
- def test_get_test_missing_test(self):
- self.tier.tests_array = [self.test]
- with mock.patch.object(self.tier, 'is_test', return_value=False):
- self.assertEqual(self.tier.get_test('test_name'), None)
-
- def test_get_name(self):
- self.assertEqual(self.tier.get_name(), 'test_tier')
-
- def test_get_order(self):
- self.assertEqual(self.tier.get_order(), 'test_order')
-
- def test_get_ci_loop(self):
- self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop')
-
- def test_testcase_is_none_in_item(self):
- self.assertEqual(tier_handler.TestCase.is_none("item"), False)
-
- def test_testcase_is_none_no_item(self):
- self.assertEqual(tier_handler.TestCase.is_none(None), True)
-
- def test_testcase_is_compatible(self):
- self.assertEqual(
- self.testcase.is_compatible('test_installer', 'test_scenario'),
- True)
-
- def test_testcase_is_compatible_2(self):
- self.assertEqual(
- self.testcase.is_compatible('missing_installer', 'test_scenario'),
- False)
- self.assertEqual(
- self.testcase.is_compatible('test_installer', 'missing_scenario'),
- False)
-
- @mock.patch('re.search', side_effect=TypeError)
- def test_testcase_is_compatible3(self, *args):
- self.assertEqual(
- self.testcase.is_compatible('test_installer', 'test_scenario'),
- False)
- args[0].assert_called_once_with('test_installer', 'test_installer')
-
- def test_testcase_get_name(self):
- self.assertEqual(self.tier.get_name(), 'test_tier')
-
- def test_testcase_is_enabled(self):
- self.assertEqual(self.testcase.is_enabled(), 'true')
-
- def test_testcase_get_criteria(self):
- self.assertEqual(self.testcase.get_criteria(), 'test_criteria')
-
- def test_testcase_is_blocking(self):
- self.assertTrue(self.testcase.is_blocking())
-
- def test_testcase_get_project(self):
- self.assertEqual(self.testcase.get_project(), 'project_name')
-
- def test_testcase_get_order(self):
- self.assertEqual(self.tier.get_order(), 'test_order')
-
- def test_testcase_get_ci_loop(self):
- self.assertEqual(self.tier.get_ci_loop(), 'test_ci_loop')
-
- def test_dependency_get_installer(self):
- self.assertEqual(self.dependency.get_installer(), 'test_installer')
-
- def test_dependency_get_scenario(self):
- self.assertEqual(self.dependency.get_scenario(), 'test_scenario')
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/__init__.py b/functest/tests/unit/core/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/unit/core/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/core/test_feature.py b/functest/tests/unit/core/test_feature.py
deleted file mode 100644
index 3219c726..00000000
--- a/functest/tests/unit/core/test_feature.py
+++ /dev/null
@@ -1,117 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-
-import mock
-
-from functest.core import feature
-from functest.core import testcase
-
-
-class FeatureTestingBase(unittest.TestCase):
-
- _case_name = "foo"
- _project_name = "bar"
- _repo = "dir_repo_bar"
- _cmd = "run_bar_tests.py"
- _output_file = '/home/opnfv/functest/results/foo.log'
- feature = None
-
- @mock.patch('time.time', side_effect=[1, 2])
- def _test_run(self, status, mock_method=None):
- self.assertEqual(self.feature.run(cmd=self._cmd), status)
- if status == testcase.TestCase.EX_OK:
- self.assertEqual(self.feature.result, 100)
- else:
- self.assertEqual(self.feature.result, 0)
- mock_method.assert_has_calls([mock.call(), mock.call()])
- self.assertEqual(self.feature.start_time, 1)
- self.assertEqual(self.feature.stop_time, 2)
-
- def test_logger_module_ko(self):
- with mock.patch('six.moves.builtins.open'):
- self.feature = feature.Feature(
- project_name=self._project_name, case_name=self._case_name)
- self.assertEqual(self.feature.logger.name, self._case_name)
-
- def test_logger_module(self):
- with mock.patch('six.moves.builtins.open'):
- self.feature = feature.Feature(
- project_name=self._project_name, case_name=self._case_name,
- run={'module': 'bar'})
- self.assertEqual(self.feature.logger.name, 'bar')
-
-
-class FeatureTesting(FeatureTestingBase):
-
- def setUp(self):
- # logging must be disabled else it calls time.time()
- # what will break these unit tests.
- logging.disable(logging.CRITICAL)
- with mock.patch('six.moves.builtins.open'):
- self.feature = feature.Feature(
- project_name=self._project_name, case_name=self._case_name)
-
- def test_run_exc(self):
- # pylint: disable=bad-continuation
- with mock.patch.object(
- self.feature, 'execute',
- side_effect=Exception) as mock_method:
- self._test_run(testcase.TestCase.EX_RUN_ERROR)
- mock_method.assert_called_once_with(cmd=self._cmd)
-
- def test_run(self):
- self._test_run(testcase.TestCase.EX_RUN_ERROR)
-
-
-class BashFeatureTesting(FeatureTestingBase):
-
- def setUp(self):
- # logging must be disabled else it calls time.time()
- # what will break these unit tests.
- logging.disable(logging.CRITICAL)
- with mock.patch('six.moves.builtins.open'):
- self.feature = feature.BashFeature(
- project_name=self._project_name, case_name=self._case_name)
-
- @mock.patch('subprocess.Popen')
- def test_run_no_cmd(self, mock_subproc):
- self.assertEqual(
- self.feature.run(), testcase.TestCase.EX_RUN_ERROR)
- mock_subproc.assert_not_called()
-
- @mock.patch('subprocess.Popen')
- def test_run_ko(self, mock_subproc):
- with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen:
- mock_obj = mock.Mock()
- attrs = {'wait.return_value': 1}
- mock_obj.configure_mock(**attrs)
-
- mock_subproc.return_value = mock_obj
- self._test_run(testcase.TestCase.EX_RUN_ERROR)
- mopen.assert_called_once_with(self._output_file, "w+")
-
- @mock.patch('subprocess.Popen')
- def test_run(self, mock_subproc):
- with mock.patch('six.moves.builtins.open', mock.mock_open()) as mopen:
- mock_obj = mock.Mock()
- attrs = {'wait.return_value': 0}
- mock_obj.configure_mock(**attrs)
-
- mock_subproc.return_value = mock_obj
- self._test_run(testcase.TestCase.EX_OK)
- mopen.assert_called_once_with(self._output_file, "w+")
-
-
-if __name__ == "__main__":
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_robotframework.py b/functest/tests/unit/core/test_robotframework.py
deleted file mode 100644
index 28fd15f6..00000000
--- a/functest/tests/unit/core/test_robotframework.py
+++ /dev/null
@@ -1,199 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the classes required to fully cover robot."""
-
-import errno
-import logging
-import os
-import unittest
-
-import mock
-from robot.errors import DataError, RobotError
-from robot.result import model
-from robot.utils.robottime import timestamp_to_secs
-
-from functest.core import robotframework
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class ResultVisitorTesting(unittest.TestCase):
-
- """The class testing ResultVisitor."""
- # pylint: disable=missing-docstring
-
- def setUp(self):
- self.visitor = robotframework.ResultVisitor()
-
- def test_empty(self):
- self.assertFalse(self.visitor.get_data())
-
- def test_ok(self):
- data = {'name': 'foo',
- 'parent': 'bar',
- 'status': 'PASS',
- 'starttime': "20161216 16:00:00.000",
- 'endtime': "20161216 16:00:01.000",
- 'elapsedtime': 1000,
- 'text': 'Hello, World!',
- 'critical': True}
- test = model.TestCase(
- name=data['name'], status=data['status'], message=data['text'],
- starttime=data['starttime'], endtime=data['endtime'])
- test.parent = mock.Mock()
- config = {'name': data['parent'],
- 'criticality.test_is_critical.return_value': data[
- 'critical']}
- test.parent.configure_mock(**config)
- self.visitor.visit_test(test)
- self.assertEqual(self.visitor.get_data(), [data])
-
-
-class ParseResultTesting(unittest.TestCase):
-
- """The class testing RobotFramework.parse_results()."""
- # pylint: disable=missing-docstring
-
- _config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
- 'endtime': '20161216 16:00:01.000'}
-
- def setUp(self):
- self.test = robotframework.RobotFramework(
- case_name='robot', project_name='functest')
-
- @mock.patch('robot.api.ExecutionResult', side_effect=DataError)
- def test_raises_exc(self, mock_method):
- with self.assertRaises(DataError):
- self.test.parse_results()
- mock_method.assert_called_once_with(
- os.path.join(self.test.res_dir, 'output.xml'))
-
- def _test_result(self, config, result):
- suite = mock.Mock()
- suite.configure_mock(**config)
- with mock.patch('robot.api.ExecutionResult',
- return_value=mock.Mock(suite=suite)):
- self.test.parse_results()
- self.assertEqual(self.test.result, result)
- self.assertEqual(self.test.start_time,
- timestamp_to_secs(config['starttime']))
- self.assertEqual(self.test.stop_time,
- timestamp_to_secs(config['endtime']))
- self.assertEqual(self.test.details,
- {'description': config['name'], 'tests': []})
-
- def test_null_passed(self):
- self._config.update({'statistics.critical.passed': 0,
- 'statistics.critical.total': 20})
- self._test_result(self._config, 0)
-
- def test_no_test(self):
- self._config.update({'statistics.critical.passed': 20,
- 'statistics.critical.total': 0})
- self._test_result(self._config, 0)
-
- def test_half_success(self):
- self._config.update({'statistics.critical.passed': 10,
- 'statistics.critical.total': 20})
- self._test_result(self._config, 50)
-
- def test_success(self):
- self._config.update({'statistics.critical.passed': 20,
- 'statistics.critical.total': 20})
- self._test_result(self._config, 100)
-
-
-class RunTesting(unittest.TestCase):
-
- """The class testing RobotFramework.run()."""
- # pylint: disable=missing-docstring
-
- suites = ["foo"]
- variable = []
- variablefile = []
-
- def setUp(self):
- self.test = robotframework.RobotFramework(
- case_name='robot', project_name='functest')
-
- def test_exc_key_error(self):
- self.assertEqual(self.test.run(), self.test.EX_RUN_ERROR)
-
- @mock.patch('robot.run')
- def _test_makedirs_exc(self, *args):
- with mock.patch.object(self.test, 'parse_results') as mock_method:
- self.assertEqual(
- self.test.run(
- suites=self.suites, variable=self.variable,
- variablefile=self.variablefile),
- self.test.EX_RUN_ERROR)
- args[0].assert_not_called()
- mock_method.asser_not_called()
-
- @mock.patch('os.makedirs', side_effect=Exception)
- def test_makedirs_exc(self, *args):
- self._test_makedirs_exc()
- args[0].assert_called_once_with(self.test.res_dir)
-
- @mock.patch('os.makedirs', side_effect=OSError)
- def test_makedirs_oserror(self, *args):
- self._test_makedirs_exc()
- args[0].assert_called_once_with(self.test.res_dir)
-
- @mock.patch('robot.run')
- def _test_makedirs(self, *args):
- with mock.patch.object(self.test, 'parse_results') as mock_method:
- self.assertEqual(
- self.test.run(suites=self.suites, variable=self.variable),
- self.test.EX_OK)
- args[0].assert_called_once_with(
- *self.suites, log='NONE', output=self.test.xml_file,
- report='NONE', stdout=mock.ANY, variable=self.variable,
- variablefile=self.variablefile)
- mock_method.assert_called_once_with()
-
- @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
- def test_makedirs_oserror17(self, *args):
- self._test_makedirs()
- args[0].assert_called_once_with(self.test.res_dir)
-
- @mock.patch('os.makedirs')
- def test_makedirs(self, *args):
- self._test_makedirs()
- args[0].assert_called_once_with(self.test.res_dir)
-
- @mock.patch('robot.run')
- def _test_parse_results(self, status, *args):
- self.assertEqual(
- self.test.run(
- suites=self.suites, variable=self.variable,
- variablefile=self.variablefile),
- status)
- args[0].assert_called_once_with(
- *self.suites, log='NONE', output=self.test.xml_file,
- report='NONE', stdout=mock.ANY, variable=self.variable,
- variablefile=self.variablefile)
-
- def test_parse_results_exc(self):
- with mock.patch.object(self.test, 'parse_results',
- side_effect=Exception) as mock_method:
- self._test_parse_results(self.test.EX_RUN_ERROR)
- mock_method.assert_called_once_with()
-
- def test_parse_results_robot_error(self):
- with mock.patch.object(self.test, 'parse_results',
- side_effect=RobotError('foo')) as mock_method:
- self._test_parse_results(self.test.EX_RUN_ERROR)
- mock_method.assert_called_once_with()
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_testcase.py b/functest/tests/unit/core/test_testcase.py
deleted file mode 100644
index e11e5ff7..00000000
--- a/functest/tests/unit/core/test_testcase.py
+++ /dev/null
@@ -1,277 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the class required to fully cover testcase."""
-
-from datetime import datetime
-import json
-import logging
-import os
-import unittest
-
-from functest.core import testcase
-
-import mock
-import requests
-
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-
-class TestCaseTesting(unittest.TestCase):
- """The class testing TestCase."""
-
- # pylint: disable=missing-docstring,too-many-public-methods
-
- _case_name = "base"
- _project_name = "functest"
- _published_result = "PASS"
- _test_db_url = "http://testresults.opnfv.org/test/api/v1/results"
- _headers = {'Content-Type': 'application/json'}
-
- def setUp(self):
- self.test = testcase.TestCase(case_name=self._case_name,
- project_name=self._project_name)
- self.test.start_time = 1
- self.test.stop_time = 2
- self.test.result = 100
- self.test.details = {"Hello": "World"}
- os.environ['TEST_DB_URL'] = TestCaseTesting._test_db_url
- os.environ['INSTALLER_TYPE'] = "installer_type"
- os.environ['DEPLOY_SCENARIO'] = "scenario"
- os.environ['NODE_NAME'] = "node_name"
- os.environ['BUILD_TAG'] = "foo-daily-master-bar"
-
- def test_run_unimplemented(self):
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_RUN_ERROR)
-
- def _test_pushdb_missing_attribute(self):
- self.assertEqual(self.test.push_to_db(),
- testcase.TestCase.EX_PUSH_TO_DB_ERROR)
-
- def test_pushdb_no_project_name(self):
- self.test.project_name = None
- self._test_pushdb_missing_attribute()
-
- def test_pushdb_no_case_name(self):
- self.test.case_name = None
- self._test_pushdb_missing_attribute()
-
- def test_pushdb_no_start_time(self):
- self.test.start_time = None
- self._test_pushdb_missing_attribute()
-
- def test_pushdb_no_stop_time(self):
- self.test.stop_time = None
- self._test_pushdb_missing_attribute()
-
- def _test_pushdb_missing_env(self, var):
- del os.environ[var]
- self.assertEqual(self.test.push_to_db(),
- testcase.TestCase.EX_PUSH_TO_DB_ERROR)
-
- def test_pushdb_no_db_url(self):
- self._test_pushdb_missing_env('TEST_DB_URL')
-
- def test_pushdb_no_installer_type(self):
- self._test_pushdb_missing_env('INSTALLER_TYPE')
-
- def test_pushdb_no_deploy_scenario(self):
- self._test_pushdb_missing_env('DEPLOY_SCENARIO')
-
- def test_pushdb_no_node_name(self):
- self._test_pushdb_missing_env('NODE_NAME')
-
- def test_pushdb_no_build_tag(self):
- self._test_pushdb_missing_env('BUILD_TAG')
-
- @mock.patch('requests.post')
- def test_pushdb_bad_start_time(self, mock_function=None):
- self.test.start_time = "1"
- self.assertEqual(
- self.test.push_to_db(),
- testcase.TestCase.EX_PUSH_TO_DB_ERROR)
- mock_function.assert_not_called()
-
- @mock.patch('requests.post')
- def test_pushdb_bad_end_time(self, mock_function=None):
- self.test.stop_time = "2"
- self.assertEqual(
- self.test.push_to_db(),
- testcase.TestCase.EX_PUSH_TO_DB_ERROR)
- mock_function.assert_not_called()
-
- def _get_data(self):
- return {
- "build_tag": os.environ['BUILD_TAG'],
- "case_name": self._case_name,
- "criteria": 'PASS' if self.test.is_successful(
- ) == self.test.EX_OK else 'FAIL',
- "details": self.test.details,
- "installer": os.environ['INSTALLER_TYPE'],
- "pod_name": os.environ['NODE_NAME'],
- "project_name": self.test.project_name,
- "scenario": os.environ['DEPLOY_SCENARIO'],
- "start_date": datetime.fromtimestamp(
- self.test.start_time).strftime('%Y-%m-%d %H:%M:%S'),
- "stop_date": datetime.fromtimestamp(
- self.test.stop_time).strftime('%Y-%m-%d %H:%M:%S'),
- "version": "master"}
-
- @mock.patch('requests.post')
- def _test_pushdb_version(self, mock_function=None, **kwargs):
- payload = self._get_data()
- payload["version"] = kwargs.get("version", "unknown")
- self.assertEqual(self.test.push_to_db(), testcase.TestCase.EX_OK)
- mock_function.assert_called_once_with(
- os.environ['TEST_DB_URL'],
- data=json.dumps(payload, sort_keys=True),
- headers=self._headers)
-
- def test_pushdb_daily_job(self):
- self._test_pushdb_version(version="master")
-
- def test_pushdb_weekly_job(self):
- os.environ['BUILD_TAG'] = 'foo-weekly-master-bar'
- self._test_pushdb_version(version="master")
-
- def test_pushdb_random_build_tag(self):
- os.environ['BUILD_TAG'] = 'whatever'
- self._test_pushdb_version(version="unknown")
-
- @mock.patch('requests.post', return_value=mock.Mock(
- raise_for_status=mock.Mock(
- side_effect=requests.exceptions.HTTPError)))
- def test_pushdb_http_errors(self, mock_function=None):
- self.assertEqual(
- self.test.push_to_db(),
- testcase.TestCase.EX_PUSH_TO_DB_ERROR)
- mock_function.assert_called_once_with(
- os.environ['TEST_DB_URL'],
- data=json.dumps(self._get_data(), sort_keys=True),
- headers=self._headers)
-
- def test_check_criteria_missing(self):
- self.test.criteria = None
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_check_result_missing(self):
- self.test.result = None
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_check_result_failed(self):
- # Backward compatibility
- # It must be removed as soon as TestCase subclasses
- # stop setting result = 'PASS' or 'FAIL'.
- self.test.result = 'FAIL'
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_check_result_pass(self):
- # Backward compatibility
- # It must be removed as soon as TestCase subclasses
- # stop setting result = 'PASS' or 'FAIL'.
- self.test.result = 'PASS'
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_OK)
-
- def test_check_result_lt(self):
- self.test.result = 50
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_check_result_eq(self):
- self.test.result = 100
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_OK)
-
- def test_check_result_gt(self):
- self.test.criteria = 50
- self.test.result = 100
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_OK)
-
- def test_check_result_zero(self):
- self.test.criteria = 0
- self.test.result = 0
- self.assertEqual(self.test.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_get_duration_start_ko(self):
- self.test.start_time = None
- self.assertEqual(self.test.get_duration(), "XX:XX")
- self.test.start_time = 0
- self.assertEqual(self.test.get_duration(), "XX:XX")
-
- def test_get_duration_end_ko(self):
- self.test.stop_time = None
- self.assertEqual(self.test.get_duration(), "XX:XX")
- self.test.stop_time = 0
- self.assertEqual(self.test.get_duration(), "XX:XX")
-
- def test_get_invalid_duration(self):
- self.test.start_time = 2
- self.test.stop_time = 1
- self.assertEqual(self.test.get_duration(), "XX:XX")
-
- def test_get_zero_duration(self):
- self.test.start_time = 2
- self.test.stop_time = 2
- self.assertEqual(self.test.get_duration(), "00:00")
-
- def test_get_duration(self):
- self.test.start_time = 1
- self.test.stop_time = 180
- self.assertEqual(self.test.get_duration(), "02:59")
-
- def test_str_project_name_ko(self):
- self.test.project_name = None
- self.assertIn("<functest.core.testcase.TestCase object at",
- str(self.test))
-
- def test_str_case_name_ko(self):
- self.test.case_name = None
- self.assertIn("<functest.core.testcase.TestCase object at",
- str(self.test))
-
- def test_str_pass(self):
- duration = '01:01'
- with mock.patch.object(self.test, 'get_duration',
- return_value=duration), \
- mock.patch.object(self.test, 'is_successful',
- return_value=testcase.TestCase.EX_OK):
- message = str(self.test)
- self.assertIn(self._project_name, message)
- self.assertIn(self._case_name, message)
- self.assertIn(duration, message)
- self.assertIn('PASS', message)
-
- def test_str_fail(self):
- duration = '00:59'
- with mock.patch.object(self.test, 'get_duration',
- return_value=duration), \
- mock.patch.object(
- self.test, 'is_successful',
- return_value=testcase.TestCase.EX_TESTCASE_FAILED):
- message = str(self.test)
- self.assertIn(self._project_name, message)
- self.assertIn(self._case_name, message)
- self.assertIn(duration, message)
- self.assertIn('FAIL', message)
-
- def test_clean(self):
- self.assertEqual(self.test.clean(), None)
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_unit.py b/functest/tests/unit/core/test_unit.py
deleted file mode 100644
index ca73de67..00000000
--- a/functest/tests/unit/core/test_unit.py
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/usr/bin/env python
-
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-
-import mock
-
-from functest.core import unit
-from functest.core import testcase
-
-
-class PyTestSuiteRunnerTesting(unittest.TestCase):
-
- def setUp(self):
- self.psrunner = unit.Suite()
- self.psrunner.suite = "foo"
-
- @mock.patch('unittest.TestLoader')
- def _test_run(self, mock_class=None, result=mock.Mock(),
- status=testcase.TestCase.EX_OK):
- with mock.patch('functest.core.unit.unittest.TextTestRunner.run',
- return_value=result):
- self.assertEqual(self.psrunner.run(), status)
- mock_class.assert_not_called()
-
- def test_check_suite_null(self):
- self.assertEqual(unit.Suite().suite, None)
- self.psrunner.suite = None
- self._test_run(result=mock.Mock(),
- status=testcase.TestCase.EX_RUN_ERROR)
-
- def test_run_no_ut(self):
- mock_result = mock.Mock(testsRun=0, errors=[], failures=[])
- self._test_run(result=mock_result,
- status=testcase.TestCase.EX_RUN_ERROR)
- self.assertEqual(self.psrunner.result, 0)
- self.assertEqual(self.psrunner.details,
- {'errors': 0, 'failures': 0, 'stream': '',
- 'testsRun': 0})
- self.assertEqual(self.psrunner.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_run_result_ko(self):
- self.psrunner.criteria = 100
- mock_result = mock.Mock(testsRun=50, errors=[('test1', 'error_msg1')],
- failures=[('test2', 'failure_msg1')])
- self._test_run(result=mock_result)
- self.assertEqual(self.psrunner.result, 96)
- self.assertEqual(self.psrunner.details,
- {'errors': 1, 'failures': 1, 'stream': '',
- 'testsRun': 50})
- self.assertEqual(self.psrunner.is_successful(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_run_result_ok(self):
- mock_result = mock.Mock(testsRun=50, errors=[],
- failures=[])
- self._test_run(result=mock_result)
- self.assertEqual(self.psrunner.result, 100)
- self.assertEqual(self.psrunner.details,
- {'errors': 0, 'failures': 0, 'stream': '',
- 'testsRun': 50})
- self.assertEqual(self.psrunner.is_successful(),
- testcase.TestCase.EX_OK)
-
- @mock.patch('unittest.TestLoader')
- def test_run_name_exc(self, mock_class=None):
- mock_obj = mock.Mock(side_effect=ImportError)
- mock_class.side_effect = mock_obj
- self.assertEqual(self.psrunner.run(name='foo'),
- testcase.TestCase.EX_RUN_ERROR)
- mock_class.assert_called_once_with()
- mock_obj.assert_called_once_with()
-
- @mock.patch('unittest.TestLoader')
- def test_run_name(self, mock_class=None):
- mock_result = mock.Mock(testsRun=50, errors=[],
- failures=[])
- mock_obj = mock.Mock()
- mock_class.side_effect = mock_obj
- with mock.patch('functest.core.unit.unittest.TextTestRunner.run',
- return_value=mock_result):
- self.assertEqual(self.psrunner.run(name='foo'),
- testcase.TestCase.EX_OK)
- mock_class.assert_called_once_with()
- mock_obj.assert_called_once_with()
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py
deleted file mode 100644
index 0ac672f6..00000000
--- a/functest/tests/unit/core/test_vnf.py
+++ /dev/null
@@ -1,187 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import unittest
-
-import mock
-
-from functest.core import vnf
-from functest.core import testcase
-from functest.utils import constants
-
-from snaps.openstack.os_credentials import OSCreds
-
-
-class VnfBaseTesting(unittest.TestCase):
- """The class testing VNF."""
- # pylint: disable=missing-docstring,too-many-public-methods
-
- tenant_name = 'test_tenant_name'
- tenant_description = 'description'
-
- def setUp(self):
- self.test = vnf.VnfOnBoarding(project='functest', case_name='foo')
-
- def test_run_deploy_orch_exc(self):
- with mock.patch.object(self.test, 'prepare'), \
- mock.patch.object(self.test, 'deploy_orchestrator',
- side_effect=Exception) as mock_method, \
- mock.patch.object(self.test, 'deploy_vnf',
- return_value=True), \
- mock.patch.object(self.test, 'test_vnf',
- return_value=True):
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
- mock_method.assert_called_with()
-
- def test_run_deploy_vnf_exc(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=True), \
- mock.patch.object(self.test, 'deploy_vnf',
- side_effect=Exception) as mock_method:
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
- mock_method.assert_called_with()
-
- def test_run_test_vnf_exc(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=True), \
- mock.patch.object(self.test, 'deploy_vnf', return_value=True), \
- mock.patch.object(self.test, 'test_vnf',
- side_effect=Exception) as mock_method:
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
- mock_method.assert_called_with()
-
- def test_run_deploy_orch_ko(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=False), \
- mock.patch.object(self.test, 'deploy_vnf',
- return_value=True), \
- mock.patch.object(self.test, 'test_vnf',
- return_value=True):
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_run_vnf_deploy_ko(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=True), \
- mock.patch.object(self.test, 'deploy_vnf',
- return_value=False), \
- mock.patch.object(self.test, 'test_vnf',
- return_value=True):
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_run_vnf_test_ko(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=True), \
- mock.patch.object(self.test, 'deploy_vnf',
- return_value=True), \
- mock.patch.object(self.test, 'test_vnf',
- return_value=False):
- self.assertEqual(self.test.run(),
- testcase.TestCase.EX_TESTCASE_FAILED)
-
- def test_run_default(self):
- with mock.patch.object(self.test, 'prepare'),\
- mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=True), \
- mock.patch.object(self.test, 'deploy_vnf',
- return_value=True), \
- mock.patch.object(self.test, 'test_vnf',
- return_value=True):
- self.assertEqual(self.test.run(), testcase.TestCase.EX_OK)
-
- @mock.patch('functest.core.vnf.OpenStackUser')
- @mock.patch('functest.core.vnf.OpenStackProject')
- @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials',
- side_effect=Exception)
- def test_prepare_exc1(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
- args[0].assert_called_with(os_env_file=constants.ENV_FILE)
- args[1].assert_not_called()
- args[2].assert_not_called()
-
- @mock.patch('functest.core.vnf.OpenStackUser')
- @mock.patch('functest.core.vnf.OpenStackProject', side_effect=Exception)
- @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
- def test_prepare_exc2(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
- args[0].assert_called_with(os_env_file=constants.ENV_FILE)
- args[1].assert_called_with(mock.ANY, mock.ANY)
- args[2].assert_not_called()
-
- @mock.patch('functest.core.vnf.OpenStackUser', side_effect=Exception)
- @mock.patch('functest.core.vnf.OpenStackProject')
- @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
- def test_prepare_exc3(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
- args[0].assert_called_with(os_env_file=constants.ENV_FILE)
- args[1].assert_called_with(mock.ANY, mock.ANY)
- args[2].assert_called_with(mock.ANY, mock.ANY)
-
- @mock.patch('functest.core.vnf.OpenStackUser')
- @mock.patch('functest.core.vnf.OpenStackProject')
- @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials')
- def test_prepare_default(self, *args):
- self.assertEqual(self.test.prepare(), testcase.TestCase.EX_OK)
- args[0].assert_called_with(os_env_file=constants.ENV_FILE)
- args[1].assert_called_with(mock.ANY, mock.ANY)
- args[2].assert_called_with(mock.ANY, mock.ANY)
-
- def test_deploy_vnf_unimplemented(self):
- with self.assertRaises(vnf.VnfDeploymentException):
- self.test.deploy_vnf()
-
- def test_test_vnf_unimplemented(self):
- with self.assertRaises(vnf.VnfTestException):
- self.test.test_vnf()
-
- def test_deploy_orch_unimplemented(self):
- self.assertTrue(self.test.deploy_orchestrator())
-
- @mock.patch('snaps.openstack.tests.openstack_tests.get_credentials',
- return_value=OSCreds(
- username='user', password='pass',
- auth_url='http://foo.com:5000/v3', project_name='bar'),
- side_effect=Exception)
- def test_prepare_keystone_client_ko(self, *args):
- with self.assertRaises(vnf.VnfPreparationException):
- self.test.prepare()
- args[0].assert_called_once()
-
- def test_vnf_clean_exc(self):
- obj = mock.Mock()
- obj.clean.side_effect = Exception
- self.test.created_object = [obj]
- self.test.clean()
- obj.clean.assert_called_with()
-
- def test_vnf_clean(self):
- obj = mock.Mock()
- self.test.created_object = [obj]
- self.test.clean()
- obj.clean.assert_called_with()
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/energy/__init__.py b/functest/tests/unit/energy/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/unit/energy/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/energy/test_functest_energy.py b/functest/tests/unit/energy/test_functest_energy.py
deleted file mode 100644
index fd110432..00000000
--- a/functest/tests/unit/energy/test_functest_energy.py
+++ /dev/null
@@ -1,371 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Unitary test for energy module."""
-# pylint: disable=unused-argument
-import logging
-import os
-import unittest
-
-import mock
-import requests
-
-from functest.energy.energy import EnergyRecorder
-import functest.energy.energy as energy
-
-CASE_NAME = "UNIT_TEST_CASE"
-STEP_NAME = "UNIT_TEST_STEP"
-
-PREVIOUS_SCENARIO = "previous_scenario"
-PREVIOUS_STEP = "previous_step"
-
-
-class MockHttpResponse(object): # pylint: disable=too-few-public-methods
- """Mock response for Energy recorder API."""
-
- def __init__(self, text, status_code):
- """Create an instance of MockHttpResponse."""
- self.text = text
- self.status_code = status_code
-
-
-API_OK = MockHttpResponse(
- '{"status": "OK"}',
- 200
-)
-API_KO = MockHttpResponse(
- '{"message": "API-KO"}',
- 500
-)
-
-RECORDER_OK = MockHttpResponse(
- '{"environment": "UNIT_TEST",'
- ' "step": "string",'
- ' "scenario": "' + CASE_NAME + '"}',
- 200
-)
-RECORDER_KO = MockHttpResponse(
- '{"message": "An unhandled API exception occurred (MOCK)"}',
- 500
-)
-RECORDER_NOT_FOUND = MockHttpResponse(
- '{"message": "Recorder not found (MOCK)"}',
- 404
-)
-
-
-# pylint: disable=too-many-public-methods
-class EnergyRecorderTest(unittest.TestCase):
- """Energy module unitary test suite."""
-
- case_name = CASE_NAME
- request_headers = {'content-type': 'application/json'}
- returned_value_to_preserve = "value"
- exception_message_to_preserve = "exception_message"
-
- @staticmethod
- def _set_env_creds():
- """Set config values."""
- os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
- os.environ["ENERGY_RECORDER_API_USER"] = "user"
- os.environ["ENERGY_RECORDER_API_PASSWORD"] = "password"
-
- @staticmethod
- def _set_env_nocreds():
- """Set config values."""
- os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
- del os.environ["ENERGY_RECORDER_API_USER"]
- del os.environ["ENERGY_RECORDER_API_PASSWORD"]
-
- @mock.patch('functest.energy.energy.requests.post',
- return_value=RECORDER_OK)
- def test_start(self, post_mock=None, get_mock=None):
- """EnergyRecorder.start method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.post',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_start_error(self, post_mock=None):
- """EnergyRecorder.start method (error in method)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.EnergyRecorder.load_config',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_start_exception(self, conf_loader_mock=None):
- """EnergyRecorder.start test with exception during execution."""
- start_status = EnergyRecorder.start(CASE_NAME)
- self.assertFalse(start_status)
-
- @mock.patch('functest.energy.energy.requests.post',
- return_value=RECORDER_KO)
- def test_start_api_error(self, post_mock=None):
- """EnergyRecorder.start method (API error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.post',
- return_value=RECORDER_OK)
- def test_set_step(self, post_mock=None):
- """EnergyRecorder.set_step method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.post',
- return_value=RECORDER_KO)
- def test_set_step_api_error(self, post_mock=None):
- """EnergyRecorder.set_step method (API error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.post',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_set_step_error(self, post_mock=None):
- """EnergyRecorder.set_step method (method error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.EnergyRecorder.load_config',
- side_effect=requests.exceptions.ConnectionError())
- def test_set_step_connection_error(self, conf_loader_mock=None):
- """EnergyRecorder.start test with exception during execution."""
- step_status = EnergyRecorder.set_step(STEP_NAME)
- self.assertFalse(step_status)
-
- @mock.patch('functest.energy.energy.requests.delete',
- return_value=RECORDER_OK)
- def test_stop(self, delete_mock=None):
- """EnergyRecorder.stop method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.delete',
- return_value=RECORDER_KO)
- def test_stop_api_error(self, delete_mock=None):
- """EnergyRecorder.stop method (API Error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('functest.energy.energy.requests.delete',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_stop_error(self, delete_mock=None):
- """EnergyRecorder.stop method (method error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @energy.enable_recording
- def __decorated_method(self):
- """Call with to energy recorder decorators."""
- return self.returned_value_to_preserve
-
- @energy.enable_recording
- def __decorated_method_with_ex(self):
- """Call with to energy recorder decorators."""
- raise Exception(self.exception_message_to_preserve)
-
- @mock.patch("functest.energy.energy.EnergyRecorder.get_current_scenario",
- return_value=None)
- @mock.patch("functest.energy.energy.EnergyRecorder")
- def test_decorators(self,
- recorder_mock=None,
- cur_scenario_mock=None):
- """Test energy module decorators."""
- self.__decorated_method()
- calls = [mock.call.start(self.case_name),
- mock.call.stop()]
- recorder_mock.assert_has_calls(calls)
-
- @mock.patch("functest.energy.energy.EnergyRecorder.get_current_scenario",
- return_value={"scenario": PREVIOUS_SCENARIO,
- "step": PREVIOUS_STEP})
- @mock.patch("functest.energy.energy.EnergyRecorder")
- def test_decorators_with_previous(self,
- recorder_mock=None,
- cur_scenario_mock=None):
- """Test energy module decorators."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- self.__decorated_method()
- calls = [mock.call.start(self.case_name),
- mock.call.submit_scenario(PREVIOUS_SCENARIO,
- PREVIOUS_STEP)]
- recorder_mock.assert_has_calls(calls, True)
-
- def test_decorator_preserve_return(self):
- """Test that decorator preserve method returned value."""
- self.test_load_config()
- self.assertTrue(
- self.__decorated_method() == self.returned_value_to_preserve
- )
-
- @mock.patch(
- "functest.energy.energy.finish_session")
- def test_decorator_preserve_ex(self, finish_mock=None):
- """Test that decorator preserve method exceptions."""
- self.test_load_config()
- with self.assertRaises(Exception) as context:
- self.__decorated_method_with_ex()
- self.assertTrue(
- self.exception_message_to_preserve in str(context.exception)
- )
- self.assertTrue(finish_mock.called)
-
- @mock.patch("functest.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config(self, loader_mock=None, get_mock=None):
- """Test load config."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
-
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["auth"],
- ("user", "password")
- )
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["uri"],
- "http://pod-uri:8888/recorders/environment/MOCK_POD"
- )
-
- @mock.patch("functest.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config_no_creds(self, loader_mock=None, get_mock=None):
- """Test load config without creds."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_nocreds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None)
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["uri"],
- "http://pod-uri:8888/recorders/environment/MOCK_POD"
- )
-
- @mock.patch("functest.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config_ex(self, loader_mock=None, get_mock=None):
- """Test load config with exception."""
- for key in ['NODE_NAME', 'ENERGY_RECORDER_API_URL']:
- os.environ[key] = ''
- with self.assertRaises(AssertionError):
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api, None)
-
- @mock.patch("functest.energy.energy.requests.get",
- return_value=API_KO)
- def test_load_config_api_ko(self, loader_mock=None, get_mock=None):
- """Test load config with API unavailable."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api["available"],
- False)
-
- @mock.patch('functest.energy.energy.requests.get',
- return_value=RECORDER_OK)
- def test_get_current_scenario(self, loader_mock=None, get_mock=None):
- """Test get_current_scenario."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is not None)
-
- @mock.patch('functest.energy.energy.requests.get',
- return_value=RECORDER_NOT_FOUND)
- def test_current_scenario_not_found(self, get_mock=None):
- """Test get current scenario not existing."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
- @mock.patch('functest.energy.energy.requests.get',
- return_value=RECORDER_KO)
- def test_current_scenario_api_error(self, get_mock=None):
- """Test get current scenario with API error."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
- @mock.patch('functest.energy.energy.EnergyRecorder.load_config',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_current_scenario_exception(self, get_mock=None):
- """Test get current scenario with exception."""
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/__init__.py b/functest/tests/unit/utils/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/tests/unit/utils/__init__.py
+++ /dev/null
diff --git a/functest/tests/unit/utils/test_decorators.py b/functest/tests/unit/utils/test_decorators.py
deleted file mode 100644
index b4cdf6ff..00000000
--- a/functest/tests/unit/utils/test_decorators.py
+++ /dev/null
@@ -1,125 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Define the class required to fully cover decorators."""
-
-from datetime import datetime
-import errno
-import json
-import logging
-import os
-import unittest
-
-import mock
-
-from functest.core import testcase
-from functest.utils import decorators
-
-__author__ = "Cedric Ollivier <cedric.ollivier@orange.com>"
-
-DIR = '/dev'
-FILE = '{}/null'.format(DIR)
-URL = 'file://{}'.format(FILE)
-
-
-class DecoratorsTesting(unittest.TestCase):
- # pylint: disable=missing-docstring
-
- _case_name = 'base'
- _project_name = 'functest'
- _start_time = 1.0
- _stop_time = 2.0
- _result = 'PASS'
- _version = 'unknown'
- _build_tag = 'none'
- _node_name = 'bar'
- _deploy_scenario = 'foo'
- _installer_type = 'debian'
-
- def setUp(self):
- os.environ['INSTALLER_TYPE'] = self._installer_type
- os.environ['DEPLOY_SCENARIO'] = self._deploy_scenario
- os.environ['NODE_NAME'] = self._node_name
- os.environ['BUILD_TAG'] = self._build_tag
-
- def test_wraps(self):
- self.assertEqual(testcase.TestCase.push_to_db.__name__,
- "push_to_db")
-
- def _get_json(self):
- stop_time = datetime.fromtimestamp(self._stop_time).strftime(
- '%Y-%m-%d %H:%M:%S')
- start_time = datetime.fromtimestamp(self._start_time).strftime(
- '%Y-%m-%d %H:%M:%S')
- data = {'project_name': self._project_name,
- 'stop_date': stop_time, 'start_date': start_time,
- 'case_name': self._case_name, 'build_tag': self._build_tag,
- 'pod_name': self._node_name, 'installer': self._installer_type,
- 'scenario': self._deploy_scenario, 'version': self._version,
- 'details': {}, 'criteria': self._result}
- return json.dumps(data, sort_keys=True)
-
- def _get_testcase(self):
- test = testcase.TestCase(
- project_name=self._project_name, case_name=self._case_name)
- test.start_time = self._start_time
- test.stop_time = self._stop_time
- test.result = 100
- test.details = {}
- return test
-
- @mock.patch('requests.post')
- def test_http_shema(self, *args):
- os.environ['TEST_DB_URL'] = 'http://127.0.0.1'
- test = self._get_testcase()
- self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK)
- args[0].assert_called_once_with(
- 'http://127.0.0.1', data=self._get_json(),
- headers={'Content-Type': 'application/json'})
-
- def test_wrong_shema(self):
- os.environ['TEST_DB_URL'] = '/dev/null'
- test = self._get_testcase()
- self.assertEqual(
- test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR)
-
- def _test_dump(self):
- os.environ['TEST_DB_URL'] = URL
- with mock.patch.object(decorators, 'open', mock.mock_open(),
- create=True) as mock_open:
- test = self._get_testcase()
- self.assertEqual(test.push_to_db(), testcase.TestCase.EX_OK)
- mock_open.assert_called_once_with(FILE, 'a')
- handle = mock_open()
- call_args, _ = handle.write.call_args
- self.assertIn('POST', call_args[0])
- self.assertIn(self._get_json(), call_args[0])
-
- @mock.patch('os.makedirs')
- def test_default_dump(self, mock_method=None):
- self._test_dump()
- mock_method.assert_called_once_with(DIR)
-
- @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
- def test_makedirs_dir_exists(self, mock_method=None):
- self._test_dump()
- mock_method.assert_called_once_with(DIR)
-
- @mock.patch('os.makedirs', side_effect=OSError)
- def test_makedirs_exc(self, *args):
- os.environ['TEST_DB_URL'] = URL
- test = self._get_testcase()
- self.assertEqual(
- test.push_to_db(), testcase.TestCase.EX_PUSH_TO_DB_ERROR)
- args[0].assert_called_once_with(DIR)
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/tests/unit/utils/test_env.py b/functest/tests/unit/utils/test_env.py
deleted file mode 100644
index 49d2d974..00000000
--- a/functest/tests/unit/utils/test_env.py
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2018 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import logging
-import os
-import unittest
-
-from six.moves import reload_module
-
-from functest.utils import env
-
-
-class EnvTesting(unittest.TestCase):
- # pylint: disable=missing-docstring
-
- def setUp(self):
- os.environ['FOO'] = 'foo'
- os.environ['BUILD_TAG'] = 'master'
- os.environ['CI_LOOP'] = 'weekly'
-
- def test_get_unset_unknown_env(self):
- del os.environ['FOO']
- self.assertEqual(env.get('FOO'), None)
-
- def test_get_unknown_env(self):
- self.assertEqual(env.get('FOO'), 'foo')
- reload_module(env)
-
- def test_get_unset_env(self):
- del os.environ['CI_LOOP']
- self.assertEqual(
- env.get('CI_LOOP'), env.INPUTS['CI_LOOP'])
-
- def test_get_env(self):
- self.assertEqual(
- env.get('CI_LOOP'), 'weekly')
-
- def test_get_unset_env2(self):
- del os.environ['BUILD_TAG']
- self.assertEqual(
- env.get('BUILD_TAG'), env.INPUTS['BUILD_TAG'])
-
- def test_get_env2(self):
- self.assertEqual(env.get('BUILD_TAG'), 'master')
-
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/functest/utils/__init__.py b/functest/utils/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/utils/__init__.py
+++ /dev/null
diff --git a/functest/utils/constants.py b/functest/utils/constants.py
deleted file mode 100644
index 0bc00d80..00000000
--- a/functest/utils/constants.py
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env python
-
-# pylint: disable=missing-docstring
-
-import pkg_resources
-
-CONFIG_FUNCTEST_YAML = pkg_resources.resource_filename(
- 'functest', 'ci/config_functest.yaml')
-
-ENV_FILE = '/home/opnfv/functest/conf/env_file'
diff --git a/functest/utils/decorators.py b/functest/utils/decorators.py
deleted file mode 100644
index 230a99e7..00000000
--- a/functest/utils/decorators.py
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import errno
-import functools
-import os
-
-import mock
-import requests.sessions
-from six.moves import urllib
-
-
-def can_dump_request_to_file(method):
-
- def dump_preparedrequest(request, **kwargs):
- # pylint: disable=unused-argument
- parseresult = urllib.parse.urlparse(request.url)
- if parseresult.scheme == "file":
- try:
- dirname = os.path.dirname(parseresult.path)
- os.makedirs(dirname)
- except OSError as ex:
- if ex.errno != errno.EEXIST:
- raise
- with open(parseresult.path, 'a') as dumpfile:
- headers = ""
- for key in request.headers:
- headers += key + " " + request.headers[key] + "\n"
- message = "{} {}\n{}\n{}\n\n\n".format(
- request.method, request.url, headers, request.body)
- dumpfile.write(message)
- return mock.Mock()
-
- def patch_request(method, url, **kwargs):
- with requests.sessions.Session() as session:
- parseresult = urllib.parse.urlparse(url)
- if parseresult.scheme == "file":
- with mock.patch.object(session, 'send',
- side_effect=dump_preparedrequest):
- return session.request(method=method, url=url, **kwargs)
- else:
- return session.request(method=method, url=url, **kwargs)
-
- @functools.wraps(method)
- def hook(*args, **kwargs):
- with mock.patch('requests.api.request', side_effect=patch_request):
- return method(*args, **kwargs)
-
- return hook
diff --git a/functest/utils/env.py b/functest/utils/env.py
deleted file mode 100644
index aa2da0b5..00000000
--- a/functest/utils/env.py
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2018 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# pylint: disable=missing-docstring
-
-import os
-
-import prettytable
-
-INPUTS = {
- 'EXTERNAL_NETWORK': None,
- 'CI_LOOP': 'daily',
- 'DEPLOY_SCENARIO': 'os-nosdn-nofeature-noha',
- 'INSTALLER_TYPE': None,
- 'SDN_CONTROLLER_IP': None,
- 'BUILD_TAG': None,
- 'NODE_NAME': None,
- 'POD_ARCH': None,
- 'TEST_DB_URL': 'http://testresults.opnfv.org/test/api/v1/results',
- 'ENERGY_RECORDER_API_URL': 'http://energy.opnfv.fr/resources',
- 'ENERGY_RECORDER_API_USER': None,
- 'ENERGY_RECORDER_API_PASSWORD': None
-}
-
-
-def get(env_var):
- if env_var not in INPUTS.keys():
- return os.environ.get(env_var, None)
- return os.environ.get(env_var, INPUTS[env_var])
-
-
-def string():
- msg = prettytable.PrettyTable(
- header_style='upper', padding_width=5,
- field_names=['env var', 'value'])
- for env_var in INPUTS:
- msg.add_row([env_var, get(env_var) if get(env_var) else ''])
- return msg