diff options
Diffstat (limited to 'testcases/integration.py')
-rw-r--r-- | testcases/integration.py | 201 |
1 files changed, 8 insertions, 193 deletions
diff --git a/testcases/integration.py b/testcases/integration.py index bec38624..f2a5fecf 100644 --- a/testcases/integration.py +++ b/testcases/integration.py @@ -14,22 +14,10 @@ """IntegrationTestCase class """ -import os -import time import logging -import copy -import re from collections import OrderedDict from testcases import TestCase -from conf import settings as S -from tools import namespace -from tools import veth -from tools.teststepstools import TestStepsTools -from core.loader import Loader - -CHECK_PREFIX = 'validate_' - class IntegrationTestCase(TestCase): """IntegrationTestCase class @@ -41,193 +29,20 @@ class IntegrationTestCase(TestCase): self._type = 'integration' super(IntegrationTestCase, self).__init__(cfg) self._logger = logging.getLogger(__name__) - self._inttest = None - - def report_status(self, label, status): - """ Log status of test step - """ - self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED') - - def run_initialize(self): - """ Prepare test execution environment - """ - super(IntegrationTestCase, self).run_initialize() - self._inttest = {'status' : True, 'details' : ''} - - def run(self): - """Run the test - - All setup and teardown through controllers is included. - """ - def eval_step_params(params, step_result): - """ Evaluates referrences to results from previous steps - """ - def eval_param(param, STEP): - # pylint: disable=invalid-name - """ Helper function - """ - if isinstance(param, str): - # evaluate every #STEP reference inside parameter itself - macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param) - if macros: - for macro in macros: - # pylint: disable=eval-used - tmp_val = str(eval(macro[1:])) - param = param.replace(macro, tmp_val) - return param - elif isinstance(param, list) or isinstance(param, tuple): - tmp_list = [] - for item in param: - tmp_list.append(eval_param(item, STEP)) - return tmp_list - elif isinstance(param, dict): - tmp_dict = {} - for (key, value) in param.items(): - tmp_dict[key] = eval_param(value, STEP) - return tmp_dict - else: - return param - - eval_params = [] - # evaluate all parameters if needed - for param in params: - eval_params.append(eval_param(param, step_result)) - return eval_params - - # prepare test execution environment - self.run_initialize() - - try: - with self._vswitch_ctl, self._loadgen: - with self._vnf_ctl, self._collector: - if not self._vswitch_none: - self._add_flows() - - # run traffic generator if requested, otherwise wait for manual termination - if S.getValue('mode') == 'trafficgen-off': - time.sleep(2) - self._logger.debug("All is set. Please run traffic generator manually.") - input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep) - else: - with self._traffic_ctl: - if not self.test: - self._traffic_ctl.send_traffic(self._traffic) - else: - vnf_list = {} - loader = Loader() - # execute test based on TestSteps definition - if self.test: - # initialize list with results - step_result = [None] * len(self.test) - - # count how many VNFs are involved in the test - for step in self.test: - if step[0].startswith('vnf'): - vnf_list[step[0]] = None - - # check/expand GUEST configuration and copy data to shares - if len(vnf_list): - S.check_vm_settings(len(vnf_list)) - self._copy_fwd_tools_for_all_guests(len(vnf_list)) - - # run test step by step... - for i, step in enumerate(self.test): - step_ok = False - if step[0] == 'vswitch': - test_object = self._vswitch_ctl.get_vswitch() - elif step[0] == 'namespace': - test_object = namespace - elif step[0] == 'veth': - test_object = veth - elif step[0] == 'settings': - test_object = S - elif step[0] == 'tools': - test_object = TestStepsTools() - step[1] = step[1].title() - elif step[0] == 'trafficgen': - test_object = self._traffic_ctl - # in case of send_traffic method, ensure that specified - # traffic values are merged with existing self._traffic - if step[1] == 'send_traffic': - tmp_traffic = copy.deepcopy(self._traffic) - tmp_traffic.update(step[2]) - step[2] = tmp_traffic - elif step[0].startswith('vnf'): - if not vnf_list[step[0]]: - # initialize new VM - vnf_list[step[0]] = loader.get_vnf_class()() - test_object = vnf_list[step[0]] - elif step[0] == 'wait': - input(os.linesep + "Step {}: Press Enter to continue with " - "the next step...".format(i) + os.linesep + os.linesep) - continue - else: - self._logger.error("Unsupported test object %s", step[0]) - self._inttest = {'status' : False, 'details' : ' '.join(step)} - self.report_status("Step '{}'".format(' '.join(step)), - self._inttest['status']) - break - - test_method = getattr(test_object, step[1]) - test_method_check = getattr(test_object, CHECK_PREFIX + step[1]) - - step_params = [] - if test_method and test_method_check and \ - callable(test_method) and callable(test_method_check): - - try: - # eval parameters, but use only valid step_results - # to support negative indexes - step_params = eval_step_params(step[2:], step_result[:i]) - step_log = '{} {}'.format(' '.join(step[:2]), step_params) - step_result[i] = test_method(*step_params) - self._logger.debug("Step %s '%s' results '%s'", i, - step_log, step_result[i]) - time.sleep(5) - step_ok = test_method_check(step_result[i], *step_params) - except AssertionError: - self._inttest = {'status' : False, 'details' : step_log} - self._logger.error("Step %s raised assertion error", i) - # stop vnfs in case of error - for vnf in vnf_list: - vnf_list[vnf].stop() - break - except IndexError: - self._inttest = {'status' : False, 'details' : step_log} - self._logger.error("Step %s result index error %s", i, - ' '.join(step[2:])) - # stop vnfs in case of error - for vnf in vnf_list: - vnf_list[vnf].stop() - break - - self.report_status("Step {} - '{}'".format(i, step_log), step_ok) - if not step_ok: - self._inttest = {'status' : False, 'details' : step_log} - # stop vnfs in case of error - for vnf in vnf_list: - vnf_list[vnf].stop() - break - - # dump vswitch flows before they are affected by VNF termination - if not self._vswitch_none: - self._vswitch_ctl.dump_vswitch_flows() - finally: - # tear down test execution environment and log results - self.run_finalize() - - # report test results - self.run_report() + # enforce check of step result for step driven testcases + self._step_check = True def run_report(self): """ Report test results """ if self.test: results = OrderedDict() - results['status'] = 'OK' if self._inttest['status'] else 'FAILED' - results['details'] = self._inttest['details'] + results['status'] = 'OK' if self._step_status['status'] else 'FAILED' + results['details'] = self._step_status['details'] TestCase.write_result_to_file([results], self._output_file) - self.report_status("Test '{}'".format(self.name), self._inttest['status']) + self.step_report_status("Test '{}'".format(self.name), self._step_status['status']) # inform vsperf about testcase failure - if not self._inttest['status']: + if not self._step_status['status']: raise Exception + else: + super(IntegrationTestCase, self).run_report() |