aboutsummaryrefslogtreecommitdiffstats
path: root/testcases/integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'testcases/integration.py')
-rw-r--r--testcases/integration.py201
1 files changed, 8 insertions, 193 deletions
diff --git a/testcases/integration.py b/testcases/integration.py
index bec38624..f2a5fecf 100644
--- a/testcases/integration.py
+++ b/testcases/integration.py
@@ -14,22 +14,10 @@
"""IntegrationTestCase class
"""
-import os
-import time
import logging
-import copy
-import re
from collections import OrderedDict
from testcases import TestCase
-from conf import settings as S
-from tools import namespace
-from tools import veth
-from tools.teststepstools import TestStepsTools
-from core.loader import Loader
-
-CHECK_PREFIX = 'validate_'
-
class IntegrationTestCase(TestCase):
"""IntegrationTestCase class
@@ -41,193 +29,20 @@ class IntegrationTestCase(TestCase):
self._type = 'integration'
super(IntegrationTestCase, self).__init__(cfg)
self._logger = logging.getLogger(__name__)
- self._inttest = None
-
- def report_status(self, label, status):
- """ Log status of test step
- """
- self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED')
-
- def run_initialize(self):
- """ Prepare test execution environment
- """
- super(IntegrationTestCase, self).run_initialize()
- self._inttest = {'status' : True, 'details' : ''}
-
- def run(self):
- """Run the test
-
- All setup and teardown through controllers is included.
- """
- def eval_step_params(params, step_result):
- """ Evaluates referrences to results from previous steps
- """
- def eval_param(param, STEP):
- # pylint: disable=invalid-name
- """ Helper function
- """
- if isinstance(param, str):
- # evaluate every #STEP reference inside parameter itself
- macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param)
- if macros:
- for macro in macros:
- # pylint: disable=eval-used
- tmp_val = str(eval(macro[1:]))
- param = param.replace(macro, tmp_val)
- return param
- elif isinstance(param, list) or isinstance(param, tuple):
- tmp_list = []
- for item in param:
- tmp_list.append(eval_param(item, STEP))
- return tmp_list
- elif isinstance(param, dict):
- tmp_dict = {}
- for (key, value) in param.items():
- tmp_dict[key] = eval_param(value, STEP)
- return tmp_dict
- else:
- return param
-
- eval_params = []
- # evaluate all parameters if needed
- for param in params:
- eval_params.append(eval_param(param, step_result))
- return eval_params
-
- # prepare test execution environment
- self.run_initialize()
-
- try:
- with self._vswitch_ctl, self._loadgen:
- with self._vnf_ctl, self._collector:
- if not self._vswitch_none:
- self._add_flows()
-
- # run traffic generator if requested, otherwise wait for manual termination
- if S.getValue('mode') == 'trafficgen-off':
- time.sleep(2)
- self._logger.debug("All is set. Please run traffic generator manually.")
- input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep)
- else:
- with self._traffic_ctl:
- if not self.test:
- self._traffic_ctl.send_traffic(self._traffic)
- else:
- vnf_list = {}
- loader = Loader()
- # execute test based on TestSteps definition
- if self.test:
- # initialize list with results
- step_result = [None] * len(self.test)
-
- # count how many VNFs are involved in the test
- for step in self.test:
- if step[0].startswith('vnf'):
- vnf_list[step[0]] = None
-
- # check/expand GUEST configuration and copy data to shares
- if len(vnf_list):
- S.check_vm_settings(len(vnf_list))
- self._copy_fwd_tools_for_all_guests(len(vnf_list))
-
- # run test step by step...
- for i, step in enumerate(self.test):
- step_ok = False
- if step[0] == 'vswitch':
- test_object = self._vswitch_ctl.get_vswitch()
- elif step[0] == 'namespace':
- test_object = namespace
- elif step[0] == 'veth':
- test_object = veth
- elif step[0] == 'settings':
- test_object = S
- elif step[0] == 'tools':
- test_object = TestStepsTools()
- step[1] = step[1].title()
- elif step[0] == 'trafficgen':
- test_object = self._traffic_ctl
- # in case of send_traffic method, ensure that specified
- # traffic values are merged with existing self._traffic
- if step[1] == 'send_traffic':
- tmp_traffic = copy.deepcopy(self._traffic)
- tmp_traffic.update(step[2])
- step[2] = tmp_traffic
- elif step[0].startswith('vnf'):
- if not vnf_list[step[0]]:
- # initialize new VM
- vnf_list[step[0]] = loader.get_vnf_class()()
- test_object = vnf_list[step[0]]
- elif step[0] == 'wait':
- input(os.linesep + "Step {}: Press Enter to continue with "
- "the next step...".format(i) + os.linesep + os.linesep)
- continue
- else:
- self._logger.error("Unsupported test object %s", step[0])
- self._inttest = {'status' : False, 'details' : ' '.join(step)}
- self.report_status("Step '{}'".format(' '.join(step)),
- self._inttest['status'])
- break
-
- test_method = getattr(test_object, step[1])
- test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
-
- step_params = []
- if test_method and test_method_check and \
- callable(test_method) and callable(test_method_check):
-
- try:
- # eval parameters, but use only valid step_results
- # to support negative indexes
- step_params = eval_step_params(step[2:], step_result[:i])
- step_log = '{} {}'.format(' '.join(step[:2]), step_params)
- step_result[i] = test_method(*step_params)
- self._logger.debug("Step %s '%s' results '%s'", i,
- step_log, step_result[i])
- time.sleep(5)
- step_ok = test_method_check(step_result[i], *step_params)
- except AssertionError:
- self._inttest = {'status' : False, 'details' : step_log}
- self._logger.error("Step %s raised assertion error", i)
- # stop vnfs in case of error
- for vnf in vnf_list:
- vnf_list[vnf].stop()
- break
- except IndexError:
- self._inttest = {'status' : False, 'details' : step_log}
- self._logger.error("Step %s result index error %s", i,
- ' '.join(step[2:]))
- # stop vnfs in case of error
- for vnf in vnf_list:
- vnf_list[vnf].stop()
- break
-
- self.report_status("Step {} - '{}'".format(i, step_log), step_ok)
- if not step_ok:
- self._inttest = {'status' : False, 'details' : step_log}
- # stop vnfs in case of error
- for vnf in vnf_list:
- vnf_list[vnf].stop()
- break
-
- # dump vswitch flows before they are affected by VNF termination
- if not self._vswitch_none:
- self._vswitch_ctl.dump_vswitch_flows()
- finally:
- # tear down test execution environment and log results
- self.run_finalize()
-
- # report test results
- self.run_report()
+ # enforce check of step result for step driven testcases
+ self._step_check = True
def run_report(self):
""" Report test results
"""
if self.test:
results = OrderedDict()
- results['status'] = 'OK' if self._inttest['status'] else 'FAILED'
- results['details'] = self._inttest['details']
+ results['status'] = 'OK' if self._step_status['status'] else 'FAILED'
+ results['details'] = self._step_status['details']
TestCase.write_result_to_file([results], self._output_file)
- self.report_status("Test '{}'".format(self.name), self._inttest['status'])
+ self.step_report_status("Test '{}'".format(self.name), self._step_status['status'])
# inform vsperf about testcase failure
- if not self._inttest['status']:
+ if not self._step_status['status']:
raise Exception
+ else:
+ super(IntegrationTestCase, self).run_report()